You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/19 22:16:17 UTC

[kafka] branch 2.6 updated: KAFKA-9891: add integration tests for EOS and StandbyTask (#8890)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 69a4ee2  KAFKA-9891: add integration tests for EOS and StandbyTask (#8890)
69a4ee2 is described below

commit 69a4ee2abe6fe977b53b80e547704415fd2b92e0
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Jun 19 09:05:54 2020 -0700

    KAFKA-9891: add integration tests for EOS and StandbyTask (#8890)
    
    Ports the test from #8886 to trunk -- this should be merged to 2.6 branch.
    
    One open question. In 2.6 and trunk we rely on the active tasks to wipe out the store if it crashes. However, assume there is a hard JVM crash and we don't call closeDirty() the store would not be wiped out. Thus, I am wondering, if we would need to fix this (for both active and standby tasks) and do a check on startup if a local store must be wiped out?
    
    The current test passes, as we do a proper cleanup after the exception is thrown.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/StoreQueryParameters.java |   2 +-
 .../integration/StandbyTaskEOSIntegrationTest.java | 230 ++++++++++++++++++++-
 2 files changed, 224 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
index 332ac1a..3789e6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
@@ -38,7 +38,7 @@ public class StoreQueryParameters<T> {
     }
 
     public static <T> StoreQueryParameters<T> fromNameAndType(final String storeName,
-                                                          final QueryableStoreType<T>  queryableStoreType) {
+                                                              final QueryableStoreType<T>  queryableStoreType) {
         return new StoreQueryParameters<T>(storeName, queryableStoreType, null, false);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index bab5c9f..27741a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -18,18 +18,25 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -41,7 +48,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,8 +55,10 @@ import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertTrue;
 
@@ -61,6 +69,10 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 public class StandbyTaskEOSIntegrationTest {
 
+    private final static long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis();
+    private final static int KEY_0 = 0;
+    private final static int KEY_1 = 1;
+
     @Parameterized.Parameters(name = "{0}")
     public static Collection<String[]> data() {
         return Arrays.asList(new String[][] {
@@ -72,8 +84,12 @@ public class StandbyTaskEOSIntegrationTest {
     @Parameterized.Parameter
     public String eosConfig;
 
+    private final AtomicBoolean skipRecord = new AtomicBoolean(false);
+
     private String appId;
     private String inputTopic;
+    private String storeName;
+    private String outputTopic;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@@ -86,8 +102,11 @@ public class StandbyTaskEOSIntegrationTest {
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         appId = "app-" + safeTestName;
         inputTopic = "input-" + safeTestName;
-        CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
+        outputTopic = "output-" + safeTestName;
+        storeName = "store-" + safeTestName;
+        CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
         CLUSTER.createTopic(inputTopic, 1, 3);
+        CLUSTER.createTopic(outputTopic, 1, 3);
     }
 
     @Test
@@ -95,13 +114,16 @@ public class StandbyTaskEOSIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             inputTopic,
             Collections.singletonList(
-                new KeyValue<>(0, 0)),
+                new KeyValue<>(0, 0)
+            ),
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 IntegerSerializer.class,
                 IntegerSerializer.class,
-                new Properties()),
-            10L);
+                new Properties()
+            ),
+            10L
+        );
 
         final String stateDirPath = TestUtils.tempDirectory(appId).getPath();
 
@@ -109,7 +131,7 @@ public class StandbyTaskEOSIntegrationTest {
 
         try (
             final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
-            final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
+            final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch)
         ) {
             streamInstanceOne.start();
 
@@ -132,7 +154,7 @@ public class StandbyTaskEOSIntegrationTest {
     }
 
     private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
-                                                      final CountDownLatch recordProcessLatch) throws IOException {
+                                                      final CountDownLatch recordProcessLatch) throws Exception {
 
         final StreamsBuilder builder = new StreamsBuilder();
         final TaskId taskId = new TaskId(0, 0);
@@ -158,6 +180,198 @@ public class StandbyTaskEOSIntegrationTest {
         return new KafkaStreams(builder.build(), props);
     }
 
+    @Test
+    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final String base = TestUtils.tempDirectory(appId).getPath();
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(KEY_0, 0)
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            10L
+        );
+
+        try (
+            final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
+            final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
+            final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1")
+        ) {
+            // start first instance and wait for processing
+            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                1
+            );
+
+            // start second instance and wait for standby replication
+            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
+            waitForCondition(
+                () -> streamInstanceTwo.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    ).enableStaleStores()
+                ).get(KEY_0) != null,
+                REBALANCE_TIMEOUT,
+                "Could not get key from standby store"
+            );
+            // sanity check that first instance is still active
+            waitForCondition(
+                () -> streamInstanceOne.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    )
+                ).get(KEY_0) != null,
+                "Could not get key from main store"
+            );
+
+            // inject poison pill and wait for crash of first instance and recovery on second instance
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Collections.singletonList(
+                    new KeyValue<>(KEY_1, 0)
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    IntegerSerializer.class,
+                    new Properties()
+                ),
+                10L
+            );
+            waitForCondition(
+                () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
+                "Stream instance 1 did not go into error state"
+            );
+            streamInstanceOne.close();
+
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                2
+            );
+
+            // "restart" first client and wait for standby recovery
+            // (could actually also be active, but it does not matter as long as we enable "state stores"
+            startApplicationAndWaitUntilRunning(
+                Collections.singletonList(streamInstanceOneRecovery),
+                Duration.ofSeconds(30)
+            );
+            waitForCondition(
+                () -> streamInstanceOneRecovery.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    ).enableStaleStores()
+                ).get(KEY_0) != null,
+                "Could not get key from recovered standby store"
+            );
+
+            streamInstanceTwo.close();
+            waitForCondition(
+                () -> streamInstanceOneRecovery.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    )
+                ).get(KEY_0) != null,
+                REBALANCE_TIMEOUT,
+                "Could not get key from recovered main store"
+            );
+
+            // re-inject poison pill and wait for crash of first instance
+            skipRecord.set(false);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Collections.singletonList(
+                    new KeyValue<>(KEY_1, 0)
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    IntegerSerializer.class,
+                    new Properties()
+                ),
+                10L
+            );
+            waitForCondition(
+                () -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR,
+                "Stream instance 1 did not go into error state"
+            );
+        }
+    }
+
+    private KafkaStreams buildWithDeduplicationTopology(final String stateDirPath) {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(storeName),
+            Serdes.Integer(),
+            Serdes.Integer())
+        );
+        builder.<Integer, Integer>stream(inputTopic)
+            .transform(
+                () -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+                    private KeyValueStore<Integer, Integer> store;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        store = (KeyValueStore<Integer, Integer>) context.getStateStore(storeName);
+                    }
+
+                    @Override
+                    public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
+                        if (skipRecord.get()) {
+                            // we only forward so we can verify the skipping by reading the output topic
+                            // the goal is skipping is to not modify the state store
+                            return KeyValue.pair(key, value);
+                        }
+
+                        if (store.get(key) != null) {
+                            return null;
+                        }
+
+                        store.put(key, value);
+                        store.flush();
+
+                        if (key == KEY_1) {
+                            // after error injection, we need to avoid a consecutive error after rebalancing
+                            skipRecord.set(true);
+                            throw new RuntimeException("Injected test error");
+                        }
+
+                        return KeyValue.pair(key, value);
+                    }
+
+                    @Override
+                    public void close() { }
+                },
+                storeName
+            )
+            .to(outputTopic);
+
+        return new KafkaStreams(builder.build(), props(stateDirPath));
+    }
+
+
     private Properties props(final String stateDirPath) {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
@@ -169,6 +383,8 @@ public class StandbyTaskEOSIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         return streamsConfiguration;