You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/17 18:20:37 UTC

[GitHub] [kafka] abbccdda opened a new pull request #8681: KAFKA-10010: Should close standby task during HandleLostAll

abbccdda opened a new pull request #8681:
URL: https://github.com/apache/kafka/pull/8681


   Standby task could also at risk of getting into illegal state when not being closed during `HandleLostAll`
   
   1. The standby task was initializing as `CREATED` state, and task corrupted exception was thrown from registerStateStores
   
   2. The task corrupted exception was caught, and do a non-affected task commit
   
   3. The task commit failed due to task migrated exception
   
   4. The handleLostAll didn't close the standby task, leaving it as CREATED state
   
   5. Next rebalance complete, the same task was assigned back as standby task.
   
   6. Illegal Argument exception caught as state store already registered
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426958947



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +

Review comment:
       I think we might want to skip the re-registration higher up the call stack. In `StateManagerUtil#registerStateStores` we call `store.init` on each store which ultimately results in this `registerStore` being called




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426975369



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +

Review comment:
       +1, we can rely on `storeManager#getStore` inside `StateManagerUtil` to check if the store is already registered.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +
+                "in the previous round", storeName);

Review comment:
       nit: we could make the warn log entry more clear that we did not override the registered the store, e.g. "Skipped registering state store {} since it has already existed in the state manager, ..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8681:
URL: https://github.com/apache/kafka/pull/8681


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r427006280



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +

Review comment:
       Re: your concern, I don't think we can assume that a user's state store's `init` method is idempotent. AFAIK nothing should change that's relevant to the state store registration, but if something does (eg TaskCorrupted) we'd have to wipe out everything and start it all again anyways




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r427003560



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +

Review comment:
       Nah, I think we should actually keep this (although `IllegalStateException` seems to make more sense, can we change it?) -- we should just make sure we don't reach it 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8681: KAFKA-10010: Should make state store registration idempotent

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#discussion_r426998545



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -266,7 +266,8 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
         }
 
         if (stores.containsKey(storeName)) {
-            throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
+            log.warn("State Store {} has already been registered, which could be due to a half-way registration" +

Review comment:
       So are we still required to remove the illegal argument exception here? What I'm concerned is that the latest version of state store initialization might be different from previous iteration, so it's safer to just go through the entire procedure once more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8681: KAFKA-10010: Should close standby task during HandleLostAll

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#issuecomment-629840150


   I left a comment on the JIRA ticket.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8681: KAFKA-10010: Should close standby task during HandleLostAll

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8681:
URL: https://github.com/apache/kafka/pull/8681#issuecomment-629937609


   All failed tests are due to flaky:
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org