You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/17 22:06:44 UTC

[GitHub] [kafka] mjsax opened a new pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

mjsax opened a new pull request #8890:
URL: https://github.com/apache/kafka/pull/8890


   Ports the test from #8886 to `trunk` -- this should be merged to `2.6` branch.
   
   One open question. In `2.6` and `trunk` we rely on the active tasks to wipe out the store if it crashes. However, assume there is a hard JVM crash and we don't call `closeDirty()` the store would not be wiped out. Thus, I am wondering, if we would need to fix this (for both active and standby tasks) and do a check on _startup_ if a local store must be wiped out?
   
   The current test passes, as we do a proper cleanup after the exception is thrown.
   
   Call for review @guozhangwang @abbccdda 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#issuecomment-645655100


   I thought we actually do this check on startup -- in `ProcessorStateManager#initializeStoreOffsetsFromCheckpoint` we throw a TaskCorruptedException if the state dir is non-empty but some offsets are missing from the checkpoint file


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r443021214



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

Review comment:
       Yes. It's for the first phase of the test. We start the first instance and let it process the first record. As there is not enough capacity, no standby is scheduled. When we start the second instance, with "lag=0" setting, we ensure that the standby is placed at instance two. With default setting, we don't know which instance will get the active/standby assigned. -> when we inject the poison pill, we know that instance one will fail as it hosts the active.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r443024395



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

Review comment:
       Well, it make the test more complex, because all the following code depends on instance one failing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#issuecomment-645714779


   I looked into the code and what @ableegoldman says is correct. Did miss this before. So I guess we are good. -- I would update the ticket as "affects version `2.5.0`" only.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r443022016



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

Review comment:
       Right, this was my suggestion:
   > Could we instead just wait for once of the instances to crash, but not worry about which?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r443028417



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

Review comment:
       Ok, it's your call. I think this might make the tests flaky, but I guess we can figure that out later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#issuecomment-645711202


   > Thus, I am wondering, if we would need to fix this (for both active and standby tasks) and do a check on _startup_ if a local store must be wiped out?
   
   I think this is covered in trunk already? Maybe we can have a quick chat to confirm.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r442998201



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -151,6 +173,198 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
         return new KafkaStreams(builder.build(), props);
     }
 
+    @Test
+    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final String base = TestUtils.tempDirectory(appId).getPath();
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(KEY_0, 0)
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            10L
+        );
+
+        try (
+            final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1");
+            final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2");
+            final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1")
+        ) {
+            // start first instance and wait for processing
+            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30));
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                1
+            );
+
+            // start second instance and wait for standby replication
+            startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30));
+            waitForCondition(
+                () -> streamInstanceTwo.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    ).enableStaleStores()
+                ).get(KEY_0) != null,
+                REBALANCE_TIMEOUT,
+                "Could not get key from standby store"
+            );
+            // sanity check that first instance is still active
+            waitForCondition(
+                () -> streamInstanceOne.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    )
+                ).get(KEY_0) != null,
+                "Could not get key from main store"
+            );
+
+            // inject poison pill and wait for crash of first instance and recovery on second instance
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Collections.singletonList(
+                    new KeyValue<>(KEY_1, 0)
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    IntegerSerializer.class,
+                    new Properties()
+                ),
+                10L
+            );
+            waitForCondition(
+                () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
+                "Stream instance 1 did not go into error state"
+            );
+            streamInstanceOne.close();
+
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                2
+            );
+
+            // "restart" first client and wait for standby recovery
+            // (could actually also be active, but it does not matter as long as we enable "state stores"

Review comment:
       Did you mean "stale"?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        // need to set to zero to get predictable active/standby task assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

Review comment:
       Do we really need this? It seems like the only thing that depends on knowing which instance would get the active is just waiting for the crash after the poison pill. Could we instead just wait for once of the instances to crash, but not worry about which?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#issuecomment-646878784


   Cherry-picked to `2.6`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8890: KAFKA-9891: add integration tests for EOS and StandbyTask

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8890:
URL: https://github.com/apache/kafka/pull/8890


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org