You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/23 00:15:57 UTC

[kafka] branch 2.5 updated: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 6eb27fd  KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)
6eb27fd is described below

commit 6eb27fd8526bad57f2ed9c8752a50960ba01be93
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Wed Apr 22 17:15:22 2020 -0700

    KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)
    
    - Added additional synchronization and increased timeouts to handle flakiness
    - Added some pre-cautionary retries when trying to obtain lag map
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../integration/LagFetchIntegrationTest.java       | 77 ++++++++++++++++------
 1 file changed, 57 insertions(+), 20 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index e98d647..598f4c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -65,6 +66,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category({IntegrationTest.class})
 public class LagFetchIntegrationTest {
@@ -72,7 +75,8 @@ public class LagFetchIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    private static final long CONSUMER_TIMEOUT_MS = 60000;
+    private static final long WAIT_TIMEOUT_MS = 120000;
+    private static final Logger LOG = LoggerFactory.getLogger(LagFetchIntegrationTest.class);
 
     private final MockTime mockTime = CLUSTER.time;
     private Properties streamsConfiguration;
@@ -111,6 +115,18 @@ public class LagFetchIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
+    private Map<String, Map<Integer, LagInfo>> getFirstNonEmptyLagMap(final KafkaStreams streams) throws InterruptedException {
+        final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = new HashMap<>();
+        TestUtils.waitForCondition(() -> {
+            final Map<String, Map<Integer, LagInfo>> lagMap = streams.allLocalStorePartitionLags();
+            if (lagMap.size() > 0) {
+                offsetLagInfoMap.putAll(lagMap);
+            }
+            return lagMap.size() > 0;
+        }, WAIT_TIMEOUT_MS, "Should obtain non-empty lag information eventually");
+        return offsetLagInfoMap;
+    }
+
     private void shouldFetchLagsDuringRebalancing(final String optimization) throws Exception {
         final CountDownLatch latchTillActiveIsRunning = new CountDownLatch(1);
         final CountDownLatch latchTillStandbyIsRunning = new CountDownLatch(1);
@@ -166,8 +182,9 @@ public class LagFetchIntegrationTest {
 
         try {
             // First start up the active.
-            Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
-            assertThat(offsetLagInfoMap.size(), equalTo(0));
+            TestUtils.waitForCondition(() -> activeStreams.allLocalStorePartitionLags().size() == 0,
+                WAIT_TIMEOUT_MS,
+                "Should see empty lag map before streams is started.");
             activeStreams.start();
             latchTillActiveIsRunning.await(60, TimeUnit.SECONDS);
 
@@ -175,9 +192,9 @@ public class LagFetchIntegrationTest {
                 consumerConfiguration,
                 outputTopicName,
                 5,
-                CONSUMER_TIMEOUT_MS);
+                WAIT_TIMEOUT_MS);
             // Check the active reports proper lag values.
-            offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
+            Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = getFirstNonEmptyLagMap(activeStreams);
             assertThat(offsetLagInfoMap.size(), equalTo(1));
             assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
             assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
@@ -189,7 +206,7 @@ public class LagFetchIntegrationTest {
             // start up the standby & make it pause right after it has partition assigned
             standbyStreams.start();
             latchTillStandbyHasPartitionsAssigned.await(60, TimeUnit.SECONDS);
-            offsetLagInfoMap = standbyStreams.allLocalStorePartitionLags();
+            offsetLagInfoMap = getFirstNonEmptyLagMap(standbyStreams);
             assertThat(offsetLagInfoMap.size(), equalTo(1));
             assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
             assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
@@ -202,6 +219,7 @@ public class LagFetchIntegrationTest {
 
             // wait till the lag goes down to 0, on the standby
             TestUtils.waitForCondition(() -> standbyStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0,
+                WAIT_TIMEOUT_MS,
                 "Standby should eventually catchup and have zero lag.");
         } finally {
             for (final KafkaStreams streams : streamsList) {
@@ -246,8 +264,9 @@ public class LagFetchIntegrationTest {
 
         try {
             // First start up the active.
-            Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = streams.allLocalStorePartitionLags();
-            assertThat(offsetLagInfoMap.size(), equalTo(0));
+            TestUtils.waitForCondition(() -> streams.allLocalStorePartitionLags().size() == 0,
+                WAIT_TIMEOUT_MS,
+                "Should see empty lag map before streams is started.");
 
             // Get the instance to fully catch up and reach RUNNING state
             startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(60));
@@ -255,17 +274,23 @@ public class LagFetchIntegrationTest {
                 consumerConfiguration,
                 outputTopicName,
                 5,
-                CONSUMER_TIMEOUT_MS);
+                WAIT_TIMEOUT_MS);
 
             // check for proper lag values.
-            offsetLagInfoMap = streams.allLocalStorePartitionLags();
-            assertThat(offsetLagInfoMap.size(), equalTo(1));
-            assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
-            assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
-            final LagInfo zeroLagInfo = offsetLagInfoMap.get(stateStoreName).get(0);
-            assertThat(zeroLagInfo.currentOffsetPosition(), equalTo(5L));
-            assertThat(zeroLagInfo.endOffsetPosition(), equalTo(5L));
-            assertThat(zeroLagInfo.offsetLag(), equalTo(0L));
+            final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
+            TestUtils.waitForCondition(() -> {
+                final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = streams.allLocalStorePartitionLags();
+                assertThat(offsetLagInfoMap.size(), equalTo(1));
+                assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
+                assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
+
+                final LagInfo zeroLagInfo = offsetLagInfoMap.get(stateStoreName).get(0);
+                assertThat(zeroLagInfo.currentOffsetPosition(), equalTo(5L));
+                assertThat(zeroLagInfo.endOffsetPosition(), equalTo(5L));
+                assertThat(zeroLagInfo.offsetLag(), equalTo(0L));
+                zeroLagRef.set(zeroLagInfo);
+                return true;
+            }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
             // Kill instance, delete state to force restoration.
             assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60)));
@@ -277,12 +302,17 @@ public class LagFetchIntegrationTest {
             // wait till the lag goes down to 0
             final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
             // set a state restoration listener to track progress of restoration
+            final CountDownLatch restorationEndLatch = new CountDownLatch(1);
             final Map<String, Map<Integer, LagInfo>> restoreStartLagInfo = new HashMap<>();
             final Map<String, Map<Integer, LagInfo>> restoreEndLagInfo = new HashMap<>();
             restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
                 @Override
                 public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
-                    restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
+                    try {
+                        restoreStartLagInfo.putAll(getFirstNonEmptyLagMap(restartedStreams));
+                    } catch (final Exception e) {
+                        LOG.error("Exception while trying to obtain lag map", e);
+                    }
                 }
 
                 @Override
@@ -291,19 +321,26 @@ public class LagFetchIntegrationTest {
 
                 @Override
                 public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
-                    restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
+                    try {
+                        restoreEndLagInfo.putAll(getFirstNonEmptyLagMap(restartedStreams));
+                    } catch (final Exception e) {
+                        LOG.error("Exception while trying to obtain lag map", e);
+                    }
+                    restorationEndLatch.countDown();
                 }
             });
 
             restartedStreams.start();
+            restorationEndLatch.await(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             TestUtils.waitForCondition(() -> restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0,
+                WAIT_TIMEOUT_MS,
                 "Standby should eventually catchup and have zero lag.");
             final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0);
             assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L));
             assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L));
             assertThat(fullLagInfo.offsetLag(), equalTo(5L));
 
-            assertThat(zeroLagInfo, equalTo(restoreEndLagInfo.get(stateStoreName).get(0)));
+            assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagRef.get()));
         } finally {
             streams.close();
         }