You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/14 10:23:33 UTC

[GitHub] [kafka] cadonna opened a new pull request, #12638: Register and unregister changelog topics in state updater

cadonna opened a new pull request, #12638:
URL: https://github.com/apache/kafka/pull/12638

   Registering and unregistering the changelog topics in the changelog reader outside of the state updater leads to race conditions between the stream thread and the state updater thread. Thus, this PR moves registering and unregistering of changelog topics in the changelog reader into the state updater if the state updater is enabled.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r972283623


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
             if (stores.containsKey(store.name())) {
-                maybeRegisterStoreWithChangelogReader(store.name());
+                if (!stateUpdaterEnabled) {
+                    maybeRegisterStoreWithChangelogReader(store.name());

Review Comment:
   Another case for this is when we handle EOS task corruption due to no checkpoint file detected: in that case we would remove the corrupted task's changelogs, and re-initialize them. In this case the stores would be initialized but changelog readers not registered as well.
   
   In the future we can decouple the registration of state stores and the `register(final TopicPartition partition, final ProcessorStateManager stateManager)` as long as in the latter case, we are sure that the `stateManager`'s stores map are already populated, which should always be true when the task is already in stateUpdater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12638: KAFKA-10199: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12638:
URL: https://github.com/apache/kafka/pull/12638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970627977


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -406,17 +412,21 @@ private void maybeCheckpointUpdatingTasks(final long now) {
     private StateUpdaterThread stateUpdaterThread = null;
     private CountDownLatch shutdownGate;
 
-    public DefaultStateUpdater(final StreamsConfig config,
+    private String name;
+
+    public DefaultStateUpdater(final String name,

Review Comment:
   Added a name for the state updater to improve log messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971662847


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   Yes, it will. However, `InterruptException` is a `KafkaException`, thus it would be caught by the `catch`-clause below and rethrown as a `StreamsException`. I thought it would be easier to directly catch an `InterruptException` in `DefaultStateUpdater` instead of catching a `StreamsException`, unwrap it, verify if it is an `InterruptException`, and if not rethrow it. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   Yes, it will. However, `InterruptException` is a `KafkaException`, thus it would be caught by the `catch`-clause below and be rethrown as a `StreamsException`. I thought it would be easier to directly catch an `InterruptException` in `DefaultStateUpdater` instead of catching a `StreamsException`, unwrap it, verify if it is an `InterruptException`, and if not rethrow it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970626576


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -262,19 +263,19 @@ private List<TaskAndAction> getTasksAndActions() {
         private void addTask(final Task task) {
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+                log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");

Review Comment:
   I changed a couple of log messages from debug to info to better track tasks.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970630607


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -536,7 +543,9 @@ public void flushCache() {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state stores: {}", stores);
 
-        changelogReader.unregister(getAllChangelogTopicPartitions());
+        if (!stateUpdaterEnabled) {
+            changelogReader.unregister(getAllChangelogTopicPartitions());
+        }

Review Comment:
   Only unregister changelogs if the state updater is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971662847


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   Yes, it will. However, `InterruptException` is a `KafkaException`, thus it would be caught the `catch`-clause below and rethrown as a `StreamsException`. I thought it would be easier to directly catch an `InterruptException` in `DefaultStateUpdater` instead of catching a `StreamsException`, unwrap it, verify if it is an `InterruptException`, and if not rethrow it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970630206


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
             if (stores.containsKey(store.name())) {
-                maybeRegisterStoreWithChangelogReader(store.name());
+                if (!stateUpdaterEnabled) {
+                    maybeRegisterStoreWithChangelogReader(store.name());
+                }

Review Comment:
   Only register changelogs if the state updater is disabled.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -352,7 +357,9 @@ public void registerStore(final StateStore store,
         // on the state manager this state store would be closed as well
         stores.put(storeName, storeMetadata);
 
-        maybeRegisterStoreWithChangelogReader(storeName);
+        if (!stateUpdaterEnabled) {
+            maybeRegisterStoreWithChangelogReader(storeName);
+        }

Review Comment:
   Only register changelogs if the state updater is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970629341


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -575,8 +584,10 @@ public void close() throws ProcessorStateException {
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
-        changelogReader.unregister(allChangelogs);
+        if (!stateUpdaterEnabled) {
+            final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
+            changelogReader.unregister(allChangelogs);
+        }

Review Comment:
   Note that once we have only the state updater recycling a state manager becomes a noop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970625278


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -98,7 +99,7 @@ public void run() {
                 while (isRunning.get()) {
                     try {
                         runOnce();
-                    } catch (final InterruptedException interruptedException) {
+                    } catch (final InterruptedException | InterruptException interruptedException) {

Review Comment:
   The restore consumer might throw a `InterruptException` when the state updater is shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12638:
URL: https://github.com/apache/kafka/pull/12638#issuecomment-1246558345

   Call for review: @wcarlson5 @lihaosky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970637017


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -169,7 +169,7 @@ public void addPendingActiveTaskToSuspend(final TaskId taskId) {
 
     private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
         final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId);
-        return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action);
+        return pendingUpdateAction != null && pendingUpdateAction.getAction() == action;

Review Comment:
   Follow up from https://github.com/apache/kafka/pull/12600#discussion_r970078065



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971663620


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -347,6 +348,12 @@ public void register(final TopicPartition partition, final ProcessorStateManager
         }
     }
 
+    public void register(final Set<TopicPartition> changelogPartitions, final ProcessorStateManager stateManager) {

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970631567


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java:
##########
@@ -31,6 +33,8 @@ public interface ChangelogRegister {
      */
     void register(final TopicPartition partition, final ProcessorStateManager stateManager);
 
+    void register(final Set<TopicPartition> partition, final ProcessorStateManager stateManager);

Review Comment:
   This is not strictly needed, but I thought it makes registering and unregistering a bit more symmetric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971663927


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java:
##########
@@ -31,6 +33,8 @@ public interface ChangelogRegister {
      */
     void register(final TopicPartition partition, final ProcessorStateManager stateManager);
 
+    void register(final Set<TopicPartition> partition, final ProcessorStateManager stateManager);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971076310


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -575,8 +584,10 @@ public void close() throws ProcessorStateException {
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
-        changelogReader.unregister(allChangelogs);
+        if (!stateUpdaterEnabled) {
+            final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
+            changelogReader.unregister(allChangelogs);
+        }

Review Comment:
   +100



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971383169


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java:
##########
@@ -31,6 +33,8 @@ public interface ChangelogRegister {
      */
     void register(final TopicPartition partition, final ProcessorStateManager stateManager);
 
+    void register(final Set<TopicPartition> partition, final ProcessorStateManager stateManager);

Review Comment:
   nit: partition -> partitions?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   QQ: will `InterruptException` be thrown even without this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -347,6 +348,12 @@ public void register(final TopicPartition partition, final ProcessorStateManager
         }
     }
 
+    public void register(final Set<TopicPartition> changelogPartitions, final ProcessorStateManager stateManager) {

Review Comment:
   nit: `@Override`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
             if (stores.containsKey(store.name())) {
-                maybeRegisterStoreWithChangelogReader(store.name());
+                if (!stateUpdaterEnabled) {
+                    maybeRegisterStoreWithChangelogReader(store.name());

Review Comment:
   QQ: this method is called `registerStateStores` but why does it expect `store` to be already in `stores`? The only place I can find `stores.put` is called is in `registerStore` and in that method, `maybeRegisterStoreWithChangelogReader` is called immediately after `stores.put` is called. So I'm confused what's the real purpose of this method and if `maybeRegisterStoreWithChangelogReader` call here is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971709124


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
             if (stores.containsKey(store.name())) {
-                maybeRegisterStoreWithChangelogReader(store.name());
+                if (!stateUpdaterEnabled) {
+                    maybeRegisterStoreWithChangelogReader(store.name());

Review Comment:
   That is indeed confusing! When a task is recycled, the changelogs of the task to recycle are unregistered from the changelog reader. Then a new task is created and the state manager of the task to recycle is moved to the new task. For example, when a standby task is recycled to become an active task, its changelogs are unregistered from the changelog reader (https://github.com/apache/kafka/blob/2b2039f0ba88e210dd09031291c050cfcda7ce4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L579), a new active task is created, and the state manager of the standby task is transferred to the new active task (https://github.com/apache/kafka/blob/2b2039f0ba88e210dd09031291c050cfcda7ce4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L218). After the recycling, the new active task is in state `CREATED` and owns a state manager that has all state stores registered, but has the changelogs unregistered in the 
 changelog reader. When the new active task is initialized, Streams tries to register the state stores again. That is when it reaches this code and it will execute the `if`-branch. That is the state store is registered but the changelogs aren't registered with the changelog reader.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12638: KAFKA-10199: Register and unregister changelog topics in state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12638:
URL: https://github.com/apache/kafka/pull/12638#issuecomment-1248995970

   Failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testFencingOnSendOffsets(String).quorum=zk
   Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout(String).quorum=zk
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org