You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/07 05:35:31 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

guozhangwang opened a new pull request #10646:
URL: https://github.com/apache/kafka/pull/10646


   1. When register state stores, add the store to globalStateStores before calling any blocking calls that may throw errors, so that upon closing we would close the stores as well.
   2. Remove the other list as a local field, and call topology.globalStateStores when needed to get the list.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633866937



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       +1 on disallowing the app to continue after an illegal exception. We need to reserve _some_ kind of exception for actual critical, fatal system errors that a user can't just ignore to spin up a new thread. And that has essentially been the meaning of these illegal exceptions in Streams thus far. As I mentioned in another thread, I've been very concerned about this in the new handler since we haven't been strict in properly cleaning up after an illegal exception




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633854799



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       Yeah, our attitude towards IllegalStateException has been pretty cavalier thus far, and it's one of the main things I'm concerned about with the REPLACE thread functionality. We should definitely be on the lookout for possible IllegalStateException occurrences in the codebase and try to triage them so things aren't just completely screwed up if Streams is allowed to continue after hitting one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633858383



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       cc @ableegoldman @wcarlson5 @rodesai too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10646:
URL: https://github.com/apache/kafka/pull/10646


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633182424



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Actually I just observed that in this case, the mock context does not actually use the `stateManager` at all, but created its own `StateManagerStub`, and hence would not call `stateManager.registerStore`, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, `initialize`) anyways.
   
   Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the `thread.shutdown` itself to shutdown the global state manager, and hence close all state stores.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I add the store after the store names check, since if there's already a state store created, then we would not be able to book-keep both of them anyways.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
   
   Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since `store.init()` call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from `initialize state stores` all the way up to the `thread.run` and to user's exceptional handler. Though we call `thread.shutdown` eventually we would close all tasks anyways, but in the case you raised, the state store would not be in `stateManager.stores` set yet and hence would be leaked.
   
   What we can do is, e.g. in both local and global state manager, moving the `globalStores.put` / `stores.put` call at the beginning, before making any checks, so that when we throw and eventually `thread.shutdown`, the stores would already be in the set and would be closed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -62,21 +62,22 @@
 public class GlobalStateManagerImpl implements GlobalStateManager {
     private final static long NO_DEADLINE = -1L;
 
-    private final Logger log;
     private final Time time;
-    private final Consumer<byte[], byte[]> globalConsumer;
+    private final Logger log;
     private final File baseDir;
-    private final Set<String> globalStoreNames = new HashSet<>();

Review comment:
       Just re-ordering member fields here, no adding/removing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#issuecomment-854262242


   @cadonna 
   
   1. Filed a ticket for KStreams to enforce shutdown on illegal-X exceptions as a future work.
   2. Let the global state mgr to do closing before throwing illegal-X exception as well.
   3. For unit tests though, they would not cover the case 2) as I mentioned in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633182424



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Actually I just observed that in this case, the mock context does not actually use the `stateManager` at all, but created its own `StateManagerStub`, and hence would not call `stateManager.registerStore`, therefore the stores set would always be empty.. I think this is okay since in unit test we are only checking each single function's behavior (in this case, `initialize`) anyways.
   
   Also as I browse through the code (see other comment below), in this unit test when the exception is thrown the store would not be closed yet, and in practice we would rely on the `thread.shutdown` itself to shutdown the global state manager, and hence close all state stores.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I add the store after the store names check, since if there's already a state store created, then we would not be able to book-keep both of them anyways.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
   
   Anyways, I think your concern is still valid, that at least for testing code, there's a risk. Since `store.init()` call is implemented at the state store impl customization, we cannot really enforce closing there. And today, for both global state and local state manager, we would throw the exception from `initialize state stores` all the way up to the `thread.run` and to user's exceptional handler. Though we call `thread.shutdown` eventually we would close all tasks anyways, but in the case you raised, the state store would not be in `stateManager.stores` set yet and hence would be leaked.
   
   What we can do is, e.g. in both local and global state manager, moving the `globalStores.put` / `stores.put` call at the beginning, before making any checks, so that when we throw and eventually `thread.shutdown`, the stores would already be in the set and would be closed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -62,21 +62,22 @@
 public class GlobalStateManagerImpl implements GlobalStateManager {
     private final static long NO_DEADLINE = -1L;
 
-    private final Logger log;
     private final Time time;
-    private final Consumer<byte[], byte[]> globalConsumer;
+    private final Logger log;
     private final File baseDir;
-    private final Set<String> globalStoreNames = new HashSet<>();

Review comment:
       Just re-ordering member fields here, no adding/removing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r634110146



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       +1 on stopping the app after a deterministic illegal * exception. I am not sure if all illegal * exception we throw are deterministic, though. I guess most of them are. For now, we could just shutdown the app for all illegal * exception and then consider to use a different exception if we discover that a illegal * exception is transient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r632371505



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       On a second thought, it might also be relevant for production code since we now can restart the stream thread after a fatal error. This is not yet possible for a global stream thread, but it might be possible in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633518578



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       > Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
   
   If the user decides to restart a stream thread in its exception handler it is possible.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -328,7 +328,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
                 converterForStore(store)) :
             new StateStoreMetadata(store);
 
-
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`
+        // on the state manager this state store would be closed as well
         stores.put(storeName, storeMetadata);

Review comment:
       See my comment above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       If I put 
   ```
           assertThat(store1.isOpen(), is(false));
           assertThat(store2.isOpen(), is(false));
           assertThat(store3.isOpen(), is(false));
           assertThat(store4.isOpen(), is(false));
   ```
   on line 202 in `shouldThrowStreamsExceptionForOldTopicPartitions()` the test fails. Hence, we leak a state store.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I agree that we would not be able to book-keep both, but the state store in `store` that we just opened is still open in line 172. So we need to close the state store in `store` before throwing the exception otherwise we will leak it. The same applies to line 176.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#issuecomment-854262242


   @cadonna 
   
   1. Filed a ticket for KStreams to enforce shutdown on illegal-X exceptions as a future work.
   2. Let the global state mgr to do closing before throwing illegal-X exception as well.
   3. For unit tests though, they would not cover the case 2) as I mentioned in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r635024188



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Now, I see what you mean. However, I am not sure it is a good idea to rely on the code in `GlobalStreamThread` that catches the fatal exception to clean up state stores (and all the rest). If we know, we throw a fatal exception, then we should clean up immediately before we throw. That makes the `GlobalStateManagerImpl` less error-prone, because it does not need to rely on a different class for its clean up , IMO. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r634736745



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I definitely think we need to triage and maybe clean up the existing Illegal-type exceptions today. Some may not be deterministic, but we still just drop everything and shut down without any further attempts at cleaning up. In those cases it's probably down to the specific situation whether it's appropriate to continue doing so and disallow recovery from this, or just fix the handling so it does clean all resources




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633858091



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       Okay, I think I got what we were discussing now. Originally I'm thinking that since these conditions should never happen --- because in the topology when we `add state stores` we already check if the store names have existed or not, and hence we should never add two stores with the same name --- if it ever happens we would always treat it as fatal and crash stop immediately.
   
   On the higher level, I think we should NOT allow users to handle illegal-s/a themselves and hence ever possibly to treat them not as fatal, but obviously today we do not enforce that.
   
   So I think we can have two options here: 1) in the lower level hierarchy like state manager here, try to stop the stores when hitting an illegal-s/a; 2) on the higher level hierarchy as in stream thread, we enforce "stop app" on illegal-s/a. I'm a bit leaning towards 2) here but would love to hear other opinions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633852084



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Yes, but the reason is that, in the unit test we do not really follow the trace of `stateMgr.initialize() -> store.init() -> context.registerStore() -> stateMgr.registerStore()`. That's because the `context` is a mock, which does not use the `stateMgr` at all, and hence the `stores` set is always empty.
   
   If we do want to test this call trace, then we need to make the mock context to get the actual stateMgr.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#issuecomment-834080812


   ping @cadonna @ableegoldman for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r629968874



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Not really related to this line. Could you verify that the state store is closed in the unit test that tests line 148? The name of the test is `shouldThrowStreamsExceptionForOldTopicPartitions()`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       There are a a `IllegalStateException` and a couple of `IllegalArgumentException`s on the path from opening the state store within `stateStore.init()` to line 182 in `this.registerStore()`. We do not close the state stores before we throw. I do not think this is relevant for production code, but we could leak state stores in unit tests if we do not explicitly close the state stores in the unit tests.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r633518578



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       > Hmm, for production, do we ever restart a thread even for illegal-state or illegal-argument?
   
   If the user decides to restart a stream thread in its exception handler it is possible.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -328,7 +328,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
                 converterForStore(store)) :
             new StateStoreMetadata(store);
 
-
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`
+        // on the state manager this state store would be closed as well
         stores.put(storeName, storeMetadata);

Review comment:
       See my comment above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       If I put 
   ```
           assertThat(store1.isOpen(), is(false));
           assertThat(store2.isOpen(), is(false));
           assertThat(store3.isOpen(), is(false));
           assertThat(store4.isOpen(), is(false));
   ```
   on line 202 in `shouldThrowStreamsExceptionForOldTopicPartitions()` the test fails. Hence, we leak a state store.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
         }
 
+        // register the store first, so that if later an exception is thrown then eventually while we call `close`

Review comment:
       I agree that we would not be able to book-keep both, but the state store in `store` that we just opened is still open in line 172. So we need to close the state store in `store` before throwing the exception otherwise we will leak it. The same applies to line 176.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org