You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/16 03:23:44 UTC

[GitHub] [kafka] showuon commented on a change in pull request #9733: KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest

showuon commented on a change in pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#discussion_r543882023



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             streams2Alpha.cleanUp();
             streams2Alpha.start();
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);

Review comment:
       The stacktrace is like this. The line number is not mapped to master branch correctly, but you can know what it is from the method name. It failed when it's trying to get all state data and checking if the current stream is in `RUNNING` state, but it's under rebalancing.
   ```
   Stacktrace
   
   org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
   	at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
   	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
   	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
   	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112)
   	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   ```
   
   
   And, yes, we make sure it completed the rebalance then checking the running state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new member, 1 for leader re-joining group during Stable), and we only wait 1 rebalance completes, so there might be another rebalancing later. The 2 stream state transition log is like this:
   ```
   stateTransitions1:
   [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]  
   
   stateTransitions2: 
   [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]
   ```
   So, as you can see, we might enter next step when we are in step 2 (`KeyValue(REBALANCING, RUNNING)`), and there will be another rebalancing soon. That's why I'll wait explicitly for this transition pair `[KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]`

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             streams2Alpha.cleanUp();
             streams2Alpha.start();
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);

Review comment:
       @mjsax 
   The stacktrace is like this. The line number is not mapped to master branch correctly, but you can know what it is from the method name. It failed when it's trying to get all state data and checking if the current stream is in `RUNNING` state, but it's under rebalancing.
   ```
   Stacktrace
   
   org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
   	at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
   	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
   	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
   	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112)
   	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   ```
   
   
   And, yes, we make sure it completed the rebalance then checking the running state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new member, 1 for leader re-joining group during Stable), and we only wait 1 rebalance completes, so there might be another rebalancing later. The 2 stream state transition log is like this:
   ```
   stateTransitions1:
   [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]  
   
   stateTransitions2: 
   [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]
   ```
   So, as you can see, we might enter next step when we are in step 2 (`KeyValue(REBALANCING, RUNNING)`), and there will be another rebalancing soon. That's why I'll wait explicitly for this transition pair `[KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org