You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/25 16:54:42 UTC

[GitHub] [kafka] abbccdda opened a new pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

abbccdda opened a new pull request #8553:
URL: https://github.com/apache/kafka/pull/8553


   This integration test is trying to mimic the scenario where the active task writes an invalid record while the standby task replicates it under at_least_once. For EOS, it should not happen because the data is not committed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416421730



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task state.
+ * In EOS, we should not hit the duplicate processing exception on the poison key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+    private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+    private static final int NUM_BROKERS = 3;
+    private static final Duration RETENTION = Duration.ofMillis(100_000);
+    private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+    private static final String STORE_NAME = "dedup-store";
+
+    private final String appId = "test-app";
+    private final String inputTopic = "input";
+    private final String keyOne = "key_one";
+    private final String poisonKey = "poison_key";
+    private final int numThreads = 2;
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
+    );
+
+    @Parameterized.Parameter
+    public String eosConfig;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<String[]> data() {
+        return Arrays.asList(new String[][] {
+            {StreamsConfig.AT_LEAST_ONCE},
+            {StreamsConfig.EXACTLY_ONCE}
+        });
+    }
+
+    @Before
+    public void createTopics() throws Exception {
+        cleanStateBeforeTest(CLUSTER);
+        CLUSTER.createTopic(inputTopic, 1, 1);
+    }
+
+    @Test
+    public void testStandbyTaskFailOver() throws Exception {
+
+        CountDownLatch waitPoisonRecordReplication = new CountDownLatch(1);
+
+        streamInstanceOne = getStreamInstance(1, waitPoisonRecordReplication);
+        streamInstanceTwo = getStreamInstance(2, null);
+
+        CountDownLatch threadDeaths = new CountDownLatch(3);
+        streamInstanceOne.setUncaughtExceptionHandler((t, e) -> {
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+        streamInstanceTwo.setUncaughtExceptionHandler((t, e) ->{
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(keyOne, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            10L);
+
+        // Start instance one first to make sure it gets the task assignment.
+        streamInstanceOne.start();
+        waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance one should be up and running by now");
+
+        log.info("Stream instance one starts up");
+
+        streamInstanceTwo.start();
+        waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance two should be up and running by now");
+
+        log.info("Stream instance two starts up, producing the poison record");
+        // Produce the poison record
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(poisonKey, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            20L);
+
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) {
+            final QueryableStoreType<ReadOnlyWindowStore<String, String>> queryableStoreType = QueryableStoreTypes.windowStore();
+            waitForCondition(() -> {
+                ReadOnlyWindowStore<String, String> instanceTwoWindowStore =
+                    streamInstanceTwo.store(StoreQueryParameters.fromNameAndType(STORE_NAME, queryableStoreType).enableStaleStores());
+
+                final KeyValueIterator<Windowed<String>, String> iterator = instanceTwoWindowStore.all();
+                while (iterator.hasNext()) {
+                    String key = iterator.next().key.key();
+                    if (key.equals(poisonKey)) {
+                        waitPoisonRecordReplication.countDown();
+                        return true;
+                    }
+                }
+                return false;
+            }, "Did not see poison key replicated to instance two");
+        } else {
+            // Wait sufficient time to make sure the data is not replicated.
+            Thread.sleep(3000);
+        }
+
+        threadDeaths.await(15, TimeUnit.SECONDS);
+    }
+
+    private KafkaStreams getStreamInstance(final int instanceId, CountDownLatch waitPoisonRecordReplication) {
+        WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(STORE_NAME,
+            RETENTION,
+            WINDOW_SIZE,
+            true);
+        StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.String())
+                                                                     .withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.addStateStore(storeBuilder);
+
+        builder.stream(inputTopic,
+            Consumed.with(Serdes.String(), Serdes.String())).transform(
+            () -> new Transformer<String, String, KeyValue<String, String>>() {
+                private WindowStore<String, String> dedupStore;
+                private ProcessorContext context;
+
+                @Override
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.dedupStore = (WindowStore<String, String>) context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public KeyValue<String, String> transform(String key, String value) {
+                    long timestamp = context.timestamp();
+                    final WindowStoreIterator<String> storeIterator = dedupStore.fetch(key, timestamp - WINDOW_SIZE.toMillis(), timestamp);
+                    if (storeIterator.hasNext()) {
+                        throw new IllegalStateException("Caught a duplicate key " + key);
+                    }
+                    dedupStore.put(key, value, timestamp);
+
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+                }
+            }, STORE_NAME
+        ).peek((key, value) -> {
+            if (key.equals(poisonKey)) {
+                try {
+                    if (waitPoisonRecordReplication != null) {
+                        waitPoisonRecordReplication.await(15_000L, TimeUnit.MILLISECONDS);
+                    }
+                } catch (InterruptedException ignored) {
+                }
+                throw new IllegalStateException("Throw on key_two to trigger rebalance");
+            }
+        });
+
+        return new KafkaStreams(builder.build(),
+            props(numThreads, String.format("/tmp/kafka-streams/instance-%d/", instanceId)));
+    }
+
+    @After
+    public void shutdown() {
+        if (streamInstanceOne != null) {
+            streamInstanceOne.close(Duration.ofSeconds(30L));
+            streamInstanceOne.cleanUp();
+        }
+
+        if (streamInstanceTwo != null) {
+            streamInstanceTwo.close(Duration.ofSeconds(30L));
+            streamInstanceTwo.cleanUp();
+        }
+    }
+
+    private Properties props(final int numThreads, final String stateDirPath) {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        // Set commit interval long to avoid actually committing the record.

Review comment:
       why is it actually needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416422911



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task state.
+ * In EOS, we should not hit the duplicate processing exception on the poison key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+    private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+    private static final int NUM_BROKERS = 3;
+    private static final Duration RETENTION = Duration.ofMillis(100_000);
+    private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+    private static final String STORE_NAME = "dedup-store";
+
+    private final String appId = "test-app";
+    private final String inputTopic = "input";
+    private final String keyOne = "key_one";
+    private final String poisonKey = "poison_key";
+    private final int numThreads = 2;
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
+    );
+
+    @Parameterized.Parameter
+    public String eosConfig;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<String[]> data() {
+        return Arrays.asList(new String[][] {
+            {StreamsConfig.AT_LEAST_ONCE},
+            {StreamsConfig.EXACTLY_ONCE}
+        });
+    }
+
+    @Before
+    public void createTopics() throws Exception {
+        cleanStateBeforeTest(CLUSTER);
+        CLUSTER.createTopic(inputTopic, 1, 1);
+    }
+
+    @Test
+    public void testStandbyTaskFailOver() throws Exception {
+
+        CountDownLatch waitPoisonRecordReplication = new CountDownLatch(1);
+
+        streamInstanceOne = getStreamInstance(1, waitPoisonRecordReplication);
+        streamInstanceTwo = getStreamInstance(2, null);
+
+        CountDownLatch threadDeaths = new CountDownLatch(3);
+        streamInstanceOne.setUncaughtExceptionHandler((t, e) -> {
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+        streamInstanceTwo.setUncaughtExceptionHandler((t, e) ->{
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(keyOne, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            10L);
+
+        // Start instance one first to make sure it gets the task assignment.
+        streamInstanceOne.start();
+        waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance one should be up and running by now");
+
+        log.info("Stream instance one starts up");
+
+        streamInstanceTwo.start();
+        waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance two should be up and running by now");
+
+        log.info("Stream instance two starts up, producing the poison record");
+        // Produce the poison record
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(poisonKey, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            20L);
+
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) {
+            final QueryableStoreType<ReadOnlyWindowStore<String, String>> queryableStoreType = QueryableStoreTypes.windowStore();
+            waitForCondition(() -> {
+                ReadOnlyWindowStore<String, String> instanceTwoWindowStore =
+                    streamInstanceTwo.store(StoreQueryParameters.fromNameAndType(STORE_NAME, queryableStoreType).enableStaleStores());
+
+                final KeyValueIterator<Windowed<String>, String> iterator = instanceTwoWindowStore.all();
+                while (iterator.hasNext()) {
+                    String key = iterator.next().key.key();
+                    if (key.equals(poisonKey)) {
+                        waitPoisonRecordReplication.countDown();
+                        return true;
+                    }
+                }
+                return false;
+            }, "Did not see poison key replicated to instance two");
+        } else {
+            // Wait sufficient time to make sure the data is not replicated.
+            Thread.sleep(3000);
+        }
+
+        threadDeaths.await(15, TimeUnit.SECONDS);
+    }
+
+    private KafkaStreams getStreamInstance(final int instanceId, CountDownLatch waitPoisonRecordReplication) {
+        WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(STORE_NAME,
+            RETENTION,
+            WINDOW_SIZE,
+            true);
+        StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.String())
+                                                                     .withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.addStateStore(storeBuilder);
+
+        builder.stream(inputTopic,
+            Consumed.with(Serdes.String(), Serdes.String())).transform(
+            () -> new Transformer<String, String, KeyValue<String, String>>() {
+                private WindowStore<String, String> dedupStore;
+                private ProcessorContext context;
+
+                @Override
+                public void init(ProcessorContext context) {
+                    this.context = context;
+                    this.dedupStore = (WindowStore<String, String>) context.getStateStore(STORE_NAME);
+                }
+
+                @Override
+                public KeyValue<String, String> transform(String key, String value) {
+                    long timestamp = context.timestamp();
+                    final WindowStoreIterator<String> storeIterator = dedupStore.fetch(key, timestamp - WINDOW_SIZE.toMillis(), timestamp);
+                    if (storeIterator.hasNext()) {
+                        throw new IllegalStateException("Caught a duplicate key " + key);
+                    }
+                    dedupStore.put(key, value, timestamp);
+
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+                }
+            }, STORE_NAME
+        ).peek((key, value) -> {
+            if (key.equals(poisonKey)) {
+                try {
+                    if (waitPoisonRecordReplication != null) {
+                        waitPoisonRecordReplication.await(15_000L, TimeUnit.MILLISECONDS);
+                    }
+                } catch (InterruptedException ignored) {
+                }
+                throw new IllegalStateException("Throw on key_two to trigger rebalance");
+            }
+        });
+
+        return new KafkaStreams(builder.build(),
+            props(numThreads, String.format("/tmp/kafka-streams/instance-%d/", instanceId)));
+    }
+
+    @After
+    public void shutdown() {
+        if (streamInstanceOne != null) {
+            streamInstanceOne.close(Duration.ofSeconds(30L));
+            streamInstanceOne.cleanUp();
+        }
+
+        if (streamInstanceTwo != null) {
+            streamInstanceTwo.close(Duration.ofSeconds(30L));
+            streamInstanceTwo.cleanUp();
+        }
+    }
+
+    private Properties props(final int numThreads, final String stateDirPath) {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        // Set commit interval long to avoid actually committing the record.
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 0);
+

Review comment:
       I've noticed there's only a single rebalance, and we need at least 3. maybe set 
   `        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
           streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);`
   or similar




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416423943



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task state.
+ * In EOS, we should not hit the duplicate processing exception on the poison key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+    private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+    private static final int NUM_BROKERS = 3;
+    private static final Duration RETENTION = Duration.ofMillis(100_000);
+    private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+    private static final String STORE_NAME = "dedup-store";
+
+    private final String appId = "test-app";
+    private final String inputTopic = "input";
+    private final String keyOne = "key_one";
+    private final String poisonKey = "poison_key";
+    private final int numThreads = 2;
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
+    );
+
+    @Parameterized.Parameter
+    public String eosConfig;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<String[]> data() {
+        return Arrays.asList(new String[][] {
+            {StreamsConfig.AT_LEAST_ONCE},
+            {StreamsConfig.EXACTLY_ONCE}
+        });
+    }
+
+    @Before
+    public void createTopics() throws Exception {
+        cleanStateBeforeTest(CLUSTER);
+        CLUSTER.createTopic(inputTopic, 1, 1);
+    }
+
+    @Test
+    public void testStandbyTaskFailOver() throws Exception {
+
+        CountDownLatch waitPoisonRecordReplication = new CountDownLatch(1);
+
+        streamInstanceOne = getStreamInstance(1, waitPoisonRecordReplication);
+        streamInstanceTwo = getStreamInstance(2, null);
+
+        CountDownLatch threadDeaths = new CountDownLatch(3);
+        streamInstanceOne.setUncaughtExceptionHandler((t, e) -> {
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+        streamInstanceTwo.setUncaughtExceptionHandler((t, e) ->{
+            if (e.getMessage().startsWith("Caught a duplicate key") && eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                fail("Should not hit duplicate key in EOS");
+            }
+            threadDeaths.countDown();
+        });
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(keyOne, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            10L);
+
+        // Start instance one first to make sure it gets the task assignment.
+        streamInstanceOne.start();
+        waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance one should be up and running by now");
+
+        log.info("Stream instance one starts up");
+
+        streamInstanceTwo.start();
+        waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING),
+            "Stream instance two should be up and running by now");
+
+        log.info("Stream instance two starts up, producing the poison record");
+        // Produce the poison record
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(poisonKey, "value")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            20L);
+
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) {
+            final QueryableStoreType<ReadOnlyWindowStore<String, String>> queryableStoreType = QueryableStoreTypes.windowStore();
+            waitForCondition(() -> {
+                ReadOnlyWindowStore<String, String> instanceTwoWindowStore =
+                    streamInstanceTwo.store(StoreQueryParameters.fromNameAndType(STORE_NAME, queryableStoreType).enableStaleStores());
+
+                final KeyValueIterator<Windowed<String>, String> iterator = instanceTwoWindowStore.all();
+                while (iterator.hasNext()) {
+                    String key = iterator.next().key.key();
+                    if (key.equals(poisonKey)) {
+                        waitPoisonRecordReplication.countDown();
+                        return true;
+                    }
+                }
+                return false;
+            }, "Did not see poison key replicated to instance two");
+        } else {
+            // Wait sufficient time to make sure the data is not replicated.
+            Thread.sleep(3000);
+        }
+
+        threadDeaths.await(15, TimeUnit.SECONDS);

Review comment:
       assert at the end that all threads went down?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416423660



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task state.
+ * In EOS, we should not hit the duplicate processing exception on the poison key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+    private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+    private static final int NUM_BROKERS = 3;
+    private static final Duration RETENTION = Duration.ofMillis(100_000);
+    private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+    private static final String STORE_NAME = "dedup-store";
+
+    private final String appId = "test-app";
+    private final String inputTopic = "input";
+    private final String keyOne = "key_one";
+    private final String poisonKey = "poison_key";
+    private final int numThreads = 2;
+    private KafkaStreams streamInstanceOne;
+    private KafkaStreams streamInstanceTwo;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
+    );
+
+    @Parameterized.Parameter
+    public String eosConfig;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<String[]> data() {
+        return Arrays.asList(new String[][] {
+            {StreamsConfig.AT_LEAST_ONCE},
+            {StreamsConfig.EXACTLY_ONCE}
+        });
+    }
+
+    @Before
+    public void createTopics() throws Exception {
+        cleanStateBeforeTest(CLUSTER);
+        CLUSTER.createTopic(inputTopic, 1, 1);
+    }
+
+    @Test
+    public void testStandbyTaskFailOver() throws Exception {
+
+        CountDownLatch waitPoisonRecordReplication = new CountDownLatch(1);
+
+        streamInstanceOne = getStreamInstance(1, waitPoisonRecordReplication);
+        streamInstanceTwo = getStreamInstance(2, null);
+
+        CountDownLatch threadDeaths = new CountDownLatch(3);

Review comment:
       eventually all 4 threads should be down




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#issuecomment-646873238


   The other PRs got merged. Closing this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax closed pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

Posted by GitBox <gi...@apache.org>.
mjsax closed pull request #8553:
URL: https://github.com/apache/kafka/pull/8553


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org