You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/17 03:31:30 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633182424



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Actually I just observed that in this case, the mock context does not actually use the `stateManager` at all, but created its own `StateManagerStub`, and hence would not call `stateManager.registerStore`, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, `initialize`) anyways.
   
   Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the `thread.shutdown` itself to shutdown the global state manager, and hence close all state stores.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I add the store after the store names check, since if there's already a state store created, then we would not be able to book-keep both of them anyways.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
   
   Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since `store.init()` call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from `initialize state stores` all the way up to the `thread.run` and to user's exceptional handler. Though we call `thread.shutdown` eventually we would close all tasks anyways, but in the case you raised, the state store would not be in `stateManager.stores` set yet and hence would be leaked.
   
   What we can do is, e.g. in both local and global state manager, moving the `globalStores.put` / `stores.put` call at the beginning, before making any checks, so that when we throw and eventually `thread.shutdown`, the stores would already be in the set and would be closed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -62,21 +62,22 @@
 public class GlobalStateManagerImpl implements GlobalStateManager {
     private final static long NO_DEADLINE = -1L;
 
-    private final Logger log;
     private final Time time;
-    private final Consumer<byte[], byte[]> globalConsumer;
+    private final Logger log;
     private final File baseDir;
-    private final Set<String> globalStoreNames = new HashSet<>();

Review comment:
       Just re-ordering member fields here, no adding/removing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org