You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/23 00:15:57 UTC
[kafka] branch 2.5 updated: KAFKA-9512: Backport Flaky Test
LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 6eb27fd KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)
6eb27fd is described below
commit 6eb27fd8526bad57f2ed9c8752a50960ba01be93
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Wed Apr 22 17:15:22 2020 -0700
KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration (#8534)
- Added additional synchronization and increased timeouts to handle flakiness
- Added some pre-cautionary retries when trying to obtain lag map
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../integration/LagFetchIntegrationTest.java | 77 ++++++++++++++++------
1 file changed, 57 insertions(+), 20 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index e98d647..598f4c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
@@ -65,6 +66,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
@@ -72,7 +75,8 @@ public class LagFetchIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- private static final long CONSUMER_TIMEOUT_MS = 60000;
+ private static final long WAIT_TIMEOUT_MS = 120000;
+ private static final Logger LOG = LoggerFactory.getLogger(LagFetchIntegrationTest.class);
private final MockTime mockTime = CLUSTER.time;
private Properties streamsConfiguration;
@@ -111,6 +115,18 @@ public class LagFetchIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
+ private Map<String, Map<Integer, LagInfo>> getFirstNonEmptyLagMap(final KafkaStreams streams) throws InterruptedException {
+ final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = new HashMap<>();
+ TestUtils.waitForCondition(() -> {
+ final Map<String, Map<Integer, LagInfo>> lagMap = streams.allLocalStorePartitionLags();
+ if (lagMap.size() > 0) {
+ offsetLagInfoMap.putAll(lagMap);
+ }
+ return lagMap.size() > 0;
+ }, WAIT_TIMEOUT_MS, "Should obtain non-empty lag information eventually");
+ return offsetLagInfoMap;
+ }
+
private void shouldFetchLagsDuringRebalancing(final String optimization) throws Exception {
final CountDownLatch latchTillActiveIsRunning = new CountDownLatch(1);
final CountDownLatch latchTillStandbyIsRunning = new CountDownLatch(1);
@@ -166,8 +182,9 @@ public class LagFetchIntegrationTest {
try {
// First start up the active.
- Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
- assertThat(offsetLagInfoMap.size(), equalTo(0));
+ TestUtils.waitForCondition(() -> activeStreams.allLocalStorePartitionLags().size() == 0,
+ WAIT_TIMEOUT_MS,
+ "Should see empty lag map before streams is started.");
activeStreams.start();
latchTillActiveIsRunning.await(60, TimeUnit.SECONDS);
@@ -175,9 +192,9 @@ public class LagFetchIntegrationTest {
consumerConfiguration,
outputTopicName,
5,
- CONSUMER_TIMEOUT_MS);
+ WAIT_TIMEOUT_MS);
// Check the active reports proper lag values.
- offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
+ Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = getFirstNonEmptyLagMap(activeStreams);
assertThat(offsetLagInfoMap.size(), equalTo(1));
assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
@@ -189,7 +206,7 @@ public class LagFetchIntegrationTest {
// start up the standby & make it pause right after it has partition assigned
standbyStreams.start();
latchTillStandbyHasPartitionsAssigned.await(60, TimeUnit.SECONDS);
- offsetLagInfoMap = standbyStreams.allLocalStorePartitionLags();
+ offsetLagInfoMap = getFirstNonEmptyLagMap(standbyStreams);
assertThat(offsetLagInfoMap.size(), equalTo(1));
assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
@@ -202,6 +219,7 @@ public class LagFetchIntegrationTest {
// wait till the lag goes down to 0, on the standby
TestUtils.waitForCondition(() -> standbyStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0,
+ WAIT_TIMEOUT_MS,
"Standby should eventually catchup and have zero lag.");
} finally {
for (final KafkaStreams streams : streamsList) {
@@ -246,8 +264,9 @@ public class LagFetchIntegrationTest {
try {
// First start up the active.
- Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = streams.allLocalStorePartitionLags();
- assertThat(offsetLagInfoMap.size(), equalTo(0));
+ TestUtils.waitForCondition(() -> streams.allLocalStorePartitionLags().size() == 0,
+ WAIT_TIMEOUT_MS,
+ "Should see empty lag map before streams is started.");
// Get the instance to fully catch up and reach RUNNING state
startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(60));
@@ -255,17 +274,23 @@ public class LagFetchIntegrationTest {
consumerConfiguration,
outputTopicName,
5,
- CONSUMER_TIMEOUT_MS);
+ WAIT_TIMEOUT_MS);
// check for proper lag values.
- offsetLagInfoMap = streams.allLocalStorePartitionLags();
- assertThat(offsetLagInfoMap.size(), equalTo(1));
- assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
- assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
- final LagInfo zeroLagInfo = offsetLagInfoMap.get(stateStoreName).get(0);
- assertThat(zeroLagInfo.currentOffsetPosition(), equalTo(5L));
- assertThat(zeroLagInfo.endOffsetPosition(), equalTo(5L));
- assertThat(zeroLagInfo.offsetLag(), equalTo(0L));
+ final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = streams.allLocalStorePartitionLags();
+ assertThat(offsetLagInfoMap.size(), equalTo(1));
+ assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName)));
+ assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1));
+
+ final LagInfo zeroLagInfo = offsetLagInfoMap.get(stateStoreName).get(0);
+ assertThat(zeroLagInfo.currentOffsetPosition(), equalTo(5L));
+ assertThat(zeroLagInfo.endOffsetPosition(), equalTo(5L));
+ assertThat(zeroLagInfo.offsetLag(), equalTo(0L));
+ zeroLagRef.set(zeroLagInfo);
+ return true;
+ }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
// Kill instance, delete state to force restoration.
assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60)));
@@ -277,12 +302,17 @@ public class LagFetchIntegrationTest {
// wait till the lag goes down to 0
final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
// set a state restoration listener to track progress of restoration
+ final CountDownLatch restorationEndLatch = new CountDownLatch(1);
final Map<String, Map<Integer, LagInfo>> restoreStartLagInfo = new HashMap<>();
final Map<String, Map<Integer, LagInfo>> restoreEndLagInfo = new HashMap<>();
restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
- restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
+ try {
+ restoreStartLagInfo.putAll(getFirstNonEmptyLagMap(restartedStreams));
+ } catch (final Exception e) {
+ LOG.error("Exception while trying to obtain lag map", e);
+ }
}
@Override
@@ -291,19 +321,26 @@ public class LagFetchIntegrationTest {
@Override
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
- restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
+ try {
+ restoreEndLagInfo.putAll(getFirstNonEmptyLagMap(restartedStreams));
+ } catch (final Exception e) {
+ LOG.error("Exception while trying to obtain lag map", e);
+ }
+ restorationEndLatch.countDown();
}
});
restartedStreams.start();
+ restorationEndLatch.await(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
TestUtils.waitForCondition(() -> restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0,
+ WAIT_TIMEOUT_MS,
"Standby should eventually catchup and have zero lag.");
final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0);
assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L));
assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L));
assertThat(fullLagInfo.offsetLag(), equalTo(5L));
- assertThat(zeroLagInfo, equalTo(restoreEndLagInfo.get(stateStoreName).get(0)));
+ assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagRef.get()));
} finally {
streams.close();
}