You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 09:10:18 UTC

[GitHub] [kafka] mjsax opened a new pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

mjsax opened a new pull request #9688:
URL: https://github.com/apache/kafka/pull/9688


   Call for review @abbccdda @ableegoldman @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535949960



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -440,15 +419,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             waitForRunning(stateTransitions1);
             waitForRunning(stateTransitions2);
 
-            final Set<Long> committedKeys = mkSet(0L, 1L, 2L, 3L);
+            final Set<Long> newlyCommittedKeys;

Review comment:
       This is the first fix: ie how we compute those keys.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535950501



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // phase 6: (complete second batch of data; crash: let second client fail on commit)
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+            // stop case: (both client commit regularly)
+            //            (depending on the task movement in phase 5, we may or may not get newly committed data;
+            //             we show the case for which p-2 and p-3 are newly committed below)
+            //   p-0: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec     ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec     ---> 5 rec + C
+            // crash case: (second/alpha client fails and both TX are aborted)
+            //             (first/beta client reprocessed the 10 records and commits TX)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
             commitCounterClient1.set(0);
 
             if (!injectError) {
-                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade =
-                    prepareData(15L, 20L, 0L, 1L, 2L, 3L);
-                writeInputData(committedInputDataDuringUpgrade);
+                final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+                writeInputData(finishSecondBatch);
 
+                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade

Review comment:
       This is the second fix: depending on task movement, we have different set of committed records.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535951079



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // phase 6: (complete second batch of data; crash: let second client fail on commit)
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+            // stop case: (both client commit regularly)
+            //            (depending on the task movement in phase 5, we may or may not get newly committed data;
+            //             we show the case for which p-2 and p-3 are newly committed below)
+            //   p-0: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec     ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec     ---> 5 rec + C
+            // crash case: (second/alpha client fails and both TX are aborted)
+            //             (first/beta client reprocessed the 10 records and commits TX)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
             commitCounterClient1.set(0);
 
             if (!injectError) {
-                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade =
-                    prepareData(15L, 20L, 0L, 1L, 2L, 3L);
-                writeInputData(committedInputDataDuringUpgrade);
+                final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+                writeInputData(finishSecondBatch);
 
+                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
+                    .stream()
+                    .filter(pair -> !keysFirstClientAlpha.contains(pair.key))
+                    .filter(pair -> !newlyCommittedKeys.contains(pair.key))
+                    .collect(Collectors.toList());
+                committedInputDataDuringUpgrade.addAll(
+                    finishSecondBatch
+                );
+
+                expectedUncommittedResult.addAll(
+                    computeExpectedResult(finishSecondBatch, uncommittedState)

Review comment:
       For this, we needed to preserve old `uncommittedState` further above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535951938



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade

Review comment:
       Similar to above: we need to be more flexible (ie, depend on actual task movement)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#issuecomment-742911941


   Merged to `trunk` and cherry-picked to `2.7`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9688:
URL: https://github.com/apache/kafka/pull/9688


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535953082



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1074,19 +1077,27 @@ private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long, Long>
     }
 
     private Set<Long> keysFromInstance(final KafkaStreams streams) throws Exception {
-        final ReadOnlyKeyValueStore<Long, Long> store = getStore(

Review comment:
       This is another fix (we did see some error for getting the state stores, too).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r538062610



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes
             //     7b. write third batch of input data
             //         * fail the first (i.e., eos-beta) client during commit
             //         * the eos-alpha client should not pickup the pending offsets
             //         * verify uncommitted and committed result
             //     7c. restart the first client in eos-beta mode and wait until rebalance stabilizes
             //
             // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C

Review comment:
       Are these changes intentional?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // phase 6: (complete second batch of data; crash: let second client fail on commit)
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+            // stop case: (both client commit regularly)
+            //            (depending on the task movement in phase 5, we may or may not get newly committed data;
+            //             we show the case for which p-2 and p-3 are newly committed below)
+            //   p-0: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec     ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec     ---> 5 rec + C
+            // crash case: (second/alpha client fails and both TX are aborted)
+            //             (first/beta client reprocessed the 10 records and commits TX)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 5 rec + C
             commitCounterClient1.set(0);
 
             if (!injectError) {
-                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade =
-                    prepareData(15L, 20L, 0L, 1L, 2L, 3L);
-                writeInputData(committedInputDataDuringUpgrade);
+                final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+                writeInputData(finishSecondBatch);
 
+                final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade

Review comment:
       Nice catch.
   
   Reminds me though, why the second rebalance may not be deterministic in migrating tasks back? I thought our algorithm should produce deterministic results? cc @ableegoldman 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes

Review comment:
       nit: second failed client?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade

Review comment:
       @ableegoldman is it related to the UUID randomness? If yes please ignore my other question above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535948426



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -105,8 +100,8 @@
     public boolean injectError;
 
     private static final int NUM_BROKERS = 3;
-    private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
-    private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+    private static final int MAX_POLL_INTERVAL_MS = (int) Duration.ofSeconds(100L).toMillis();
+    private static final long MAX_WAIT_TIME_MS = Duration.ofMinutes(1L).toMillis();

Review comment:
       Side cleanup




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535954201



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -1161,7 +1161,7 @@ public static void verifyKeyValueTimestamps(final Properties consumerConfig,
     }
 
     private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
-        return maxMessages <= 0 || messagesConsumed < maxMessages;
+        return maxMessages > 0 && messagesConsumed < maxMessages;

Review comment:
       We have cases when we pass in `0` and for this case, the old code did loop forever until the timeout hits and the test fails. Seems this logic was wrong from the beginning on an we should stop fetching if `maxMessages <= 0` instead of looping forever.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535949319



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -319,24 +293,26 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             // crash case: (we just assumes that we inject the error for p-0; in reality it might be a different partition)
+            //             (we don't crash right away and write one record less)

Review comment:
       Added some more details/explanations and also renames a few variables below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r536321595



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -916,11 +917,12 @@ public void close() {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());

Review comment:
       5 minutes seems kind of long, the whole test should take only a few minutes and it has 11 phases.  Would 1 minute be more reasonable? Or do we actually need this timeout to cover more than one or two phases?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade

Review comment:
       I'm guessing the root source of this all is a bad assumption that the assignment would be stable if a stable `CLIENT_ID` was used? I remember we discussed that back when you first wrote this test, I'm sorry for any misinformation I supplied based on my own assumption about how the CLIENT_ID would be used :/

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -319,24 +293,26 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             // crash case: (we just assumes that we inject the error for p-0; in reality it might be a different partition)
+            //             (we don't crash right away and write one record less)
             //   p-0: 10 rec + C ---> 4 rec (pending)
             //   p-1: 10 rec + C ---> 5 rec (pending)
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             final Set<Long> cleanKeys = mkSet(0L, 1L, 2L, 3L);
-            final Set<Long> keyFilterFirstClient = keysFromInstance(streams1Alpha);
-            final long potentiallyFirstFailingKey = keyFilterFirstClient.iterator().next();
-            cleanKeys.remove(potentiallyFirstFailingKey);
+            final Set<Long> keysFirstClientAlpha = keysFromInstance(streams1Alpha);
+            final long firstFailingKeyForCrashCase = keysFirstClientAlpha.iterator().next();

Review comment:
       Thanks for cleaning up the variable names 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r536339499



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -916,11 +917,12 @@ public void close() {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());

Review comment:
       Good catch -- I set to to 5 minutes during debugging (ie, setting breakpoints). 1 minutes should be enough.
   
   > Or do we actually need this timeout to cover more than one or two phases?
   
   Not sure what you mean by this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535951595



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -774,15 +769,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             waitForRunning(stateTransitions1);
             waitForRunning(stateTransitions2);
 
-            committedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
+            newlyCommittedKeys.clear();

Review comment:
       Similar fix as above: we compute those keys differently now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535952481



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -916,11 +917,12 @@ public void close() {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());

Review comment:
       I also increase the TX timeout from the to low default of 10 seconds, to avoid broker side TX-abort during the test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1019,7 +1021,8 @@ private void verifyUncommitted(final List<KeyValue<Long, Long>> expectedResult)
                     )
                 ),
                 MULTI_PARTITION_OUTPUT_TOPIC,
-                numberOfRecords
+                numberOfRecords,
+                MAX_WAIT_TIME_MS

Review comment:
       Increase wait time here, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r536338913



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade

Review comment:
       Yes, the test assumed a more stable task->thread mapping during the assignment. But it turns out, that task assignment may "flip" (not sure about details)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r536406055



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -916,11 +917,12 @@ public void close() {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());

Review comment:
       Was just thinking about how long a. transaction might possibly be open. 1 minute SGTM




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r538757741



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes

Review comment:
       I think "failed second client" is correct. It's the 2nd client, which has failed, not the 2nd client to have failed (English is confusing 😣 )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535948890



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -152,27 +147,6 @@
     private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
     private final AtomicInteger commitRequested = new AtomicInteger(0);
 
-    // Note: this pattern only works when we just have a single instance running with a single thread
-    // If we want to extend the test or reuse this CommitPunctuator we should tighten it up
-    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
-    private static class CommitPunctuator implements Punctuator {

Review comment:
       This punctuator was an attempt to stabilize the test, but without success. Removing it as this should be a proper fix now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#issuecomment-741389532


   > BTW should we re-enable this test in the same PR?
   
   The test is enabled... But the test failed on the 2.6 branch PR -- Seems there is still something going on.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r535953242



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1132,7 +1143,7 @@ public ErrorInjector(final Map<String, Object> configs) {
         }
 
         @Override
-        public void commitTransaction() throws ProducerFencedException {
+        public void commitTransaction() {

Review comment:
       Side cleanup




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r538926128



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes
             //     7b. write third batch of input data
             //         * fail the first (i.e., eos-beta) client during commit
             //         * the eos-alpha client should not pickup the pending offsets
             //         * verify uncommitted and committed result
             //     7c. restart the first client in eos-beta mode and wait until rebalance stabilizes
             //
             // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C

Review comment:
       Yes. I wanted to improve the readability of the comment -- the additional blanks separate the the main phases of the test (each main phase write 10 records per partition that should eventually be committed).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9688: KAFKA-10017: fix flaky EOS-beta upgrade test

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r538759641



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
             // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C ---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade

Review comment:
       Yes, I think so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org