You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/07/06 21:05:21 UTC

[kafka] branch trunk updated: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd2f5f0  KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
cd2f5f0 is described below

commit cd2f5f030b353dfcaa7c3c8e18f69c681ea8e0f1
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon Jul 6 14:04:36 2020 -0700

    KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
    
    The current failures we're seeing with this test are due to faulty assumptions that it makes and not any real bug in eos-beta (at least, from what I've seen so far).
    
    The test relies on tightly controlling the commits, which it does by setting the commit interval to MAX_VALUE and manually requesting commits on the context. In two phases, the test assumes that any pending data will be committed after a rebalance. But we actually take care to avoid unnecessary commits -- with eos-alpha, we only commit tasks that are revoked while in eos-beta we must commit all tasks if any are revoked, but only if the revoked tasks themselves need a commit.
    
    The failure we see occurs when we try to verify the committed data after a second client is started and the group rebalances. The already-running client has to give up two tasks to the newly started client, but those tasks may not need to be committed in which case none of the tasks would be. So we still have an open transaction on the partitions where we try to read committed data.
    
    Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../integration/EosBetaUpgradeIntegrationTest.java | 49 ++++++++++++++++++++--
 .../integration/utils/IntegrationTestUtils.java    |  2 +-
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index 6a550e7..cd57acb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Partitioner;
@@ -42,7 +43,10 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAss
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -79,6 +83,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -147,6 +152,27 @@ public class EosBetaUpgradeIntegrationTest {
     private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
     private final AtomicInteger commitRequested = new AtomicInteger(0);
 
+    // Note: this pattern only works when we just have a single instance running with a single thread
+    // If we want to extend the test or reuse this CommitPunctuator we should tighten it up
+    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
+    private static class CommitPunctuator implements Punctuator {
+        final ProcessorContext context;
+        final AtomicBoolean requestCommit;
+
+        public CommitPunctuator(final ProcessorContext context, final AtomicBoolean requestCommit) {
+            this.context = context;
+            this.requestCommit = requestCommit;
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            if (requestCommit.get()) {
+                context.commit();
+                requestCommit.set(false);
+            }
+        }
+    }
+
     private Throwable uncaughtException;
 
     private int testNumber = 0;
@@ -401,6 +427,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not request commit for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -731,6 +759,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not request commit for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -824,6 +854,7 @@ public class EosBetaUpgradeIntegrationTest {
                     KeyValueStore<Long, Long> state = null;
                     AtomicBoolean crash;
                     AtomicInteger sharedCommit;
+                    Cancellable punctuator;
 
                     @Override
                     public void init(final ProcessorContext context) {
@@ -837,6 +868,11 @@ public class EosBetaUpgradeIntegrationTest {
                             crash = errorInjectedClient2;
                             sharedCommit = commitCounterClient2;
                         }
+                        punctuator = context.schedule(
+                            Duration.ofMillis(100),
+                            PunctuationType.WALL_CLOCK_TIME,
+                            new CommitPunctuator(context, requestCommit)
+                        );
                     }
 
                     @Override
@@ -869,7 +905,9 @@ public class EosBetaUpgradeIntegrationTest {
                     }
 
                     @Override
-                    public void close() { }
+                    public void close() {
+                        punctuator.cancel();
+                    }
                 };
             } }, storeNames)
             .to(MULTI_PARTITION_OUTPUT_TOPIC);
@@ -1036,9 +1074,12 @@ public class EosBetaUpgradeIntegrationTest {
         return expectedResult;
     }
 
-    private Set<Long> keysFromInstance(final KafkaStreams streams) {
-        final ReadOnlyKeyValueStore<Long, Long> store =
-            streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
+    private Set<Long> keysFromInstance(final KafkaStreams streams) throws Exception {
+        final ReadOnlyKeyValueStore<Long, Long> store = getStore(
+            MAX_WAIT_TIME_MS,
+            streams,
+            StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())
+        );
         final Set<Long> keys = new HashSet<>();
         try (final KeyValueIterator<Long, Long> it = store.all()) {
             while (it.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index b8af7e9..d8fb5d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1273,7 +1273,7 @@ public class IntegrationTestUtils {
          */
         public void waitForNextStableAssignment(final long maxWaitMs) throws InterruptedException {
             waitForCondition(
-                () -> nextExpectedNumStableAssignments == numStableAssignments(),
+                () -> numStableAssignments() >= nextExpectedNumStableAssignments,
                 maxWaitMs,
                 () -> "Client did not reach " + nextExpectedNumStableAssignments + " stable assignments on time, " +
                     "numStableAssignments was " + numStableAssignments()