You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/09 20:06:07 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: init changes for shutdownRequest

wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r485857758



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                 encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                 break;
+            case 8:
+                validateActiveTaskEncoding(partitions, info, logPrefix);
+
+                activeTasks = getActiveTasks(partitions, info);
+                partitionsByHost = info.partitionsByHost();
+                standbyPartitionsByHost = info.standbyPartitionByHost();
+                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
+                //recive the shutdown then call the request close

Review comment:
       the error is read in above so just need to handle like version 7

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1053,13 +1067,13 @@ ConsumerRebalanceListener rebalanceListener() {
 
     Consumer<byte[], byte[]> restoreConsumer() {
         return restoreConsumer;
-    };

Review comment:
       extra ; removed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -186,6 +186,15 @@ public ByteBuffer encode() {
                     out.writeInt(errCode);
                     out.writeLong(nextRebalanceMs);
                     break;
+                case 8:
+                    out.writeInt(usedVersion);
+                    out.writeInt(commonlySupportedVersion);
+                    encodeActiveAndStandbyTaskAssignment(out);
+                    encodeActiveAndStandbyHostPartitions(out);
+                    out.writeInt(errCode);
+                    out.writeLong(nextRebalanceMs);
+                    out.writeInt(0);

Review comment:
       no error in this case, we only want to trigger shutdown form subscriptionUserData

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,140 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.errors.ShutdownRequestedException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.*;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    @Test
+    public void shouldSendShutDownSignal() throws Exception {
+        //
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+
+        final List<KeyValue<Object, Object>> processorValueCollector = new ArrayList<>();
+
+        builder.stream(inputTopic).process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+        final Properties properties = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
+                        mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
+                        mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                        mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+                )
+        );
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            kafkaStreams.start();
+            produceMessages(0L, inputTopic);
+
+            latch.await(10, TimeUnit.SECONDS);
+
+            assertThat(processorValueCollector.size(), equalTo(1));
+        }
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput) throws ShutdownRequestedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Arrays.asList(
+                        new KeyValue<>(1, "A"),
+                        new KeyValue<>(2, "B"),
+                        new KeyValue<>(3, "C"),
+                        new KeyValue<>(4, "D"),
+                        new KeyValue<>(5, "E")),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        IntegerSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+}
+
+
+class ShutdownProcessor extends AbstractProcessor<Object, Object> {
+    final List<KeyValue<Object, Object>> valueList;
+
+    ShutdownProcessor(final List<KeyValue<Object, Object>> valueList) {
+        this.valueList = valueList;
+    }
+
+    @Override
+    public void init(final ProcessorContext context) {
+//        throw new ShutdownRequestedException("integration test");
+    }
+
+    @Override
+    public void process(final Object key, final Object value) {
+        valueList.add(new KeyValue<>(key, value));
+//        throw new ShutdownRequestedException("integration test");

Review comment:
       uncomment for test to pass, still will not cause a shutdown until request close is added to the rebalance listener

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1369,7 +1381,6 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
 
         final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
         if (info.errCode() != AssignorError.NONE.code()) {
-            // set flag to shutdown streams app
             assignmentErrorCode.set(info.errCode());

Review comment:
       records the error to be handled in onPartitionsAssigned with the other possible error

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -360,6 +369,16 @@ public static AssignmentInfo decode(final ByteBuffer data) {
                     assignmentInfo.errCode = in.readInt();
                     assignmentInfo.nextRebalanceMs = in.readLong();
                     break;
+                case 8:
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
+                    decodeActiveTasks(assignmentInfo, in);
+                    decodeStandbyTasks(assignmentInfo, in);
+                    decodeActiveAndStandbyHostPartitions(assignmentInfo, in);
+                    assignmentInfo.errCode = in.readInt();
+                    assignmentInfo.nextRebalanceMs = in.readLong();
+                    in.readInt();

Review comment:
       I don't know if this is necessary but its best to get the error out of the buffer

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AppShutdownIntegrationTest.java
##########
@@ -0,0 +1,140 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.errors.ShutdownRequestedException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.utils.Utils.*;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+
+@Category(IntegrationTest.class)
+public class AppShutdownIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    @Test
+    public void shouldSendShutDownSignal() throws Exception {
+        //
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+
+        final List<KeyValue<Object, Object>> processorValueCollector = new ArrayList<>();
+
+        builder.stream(inputTopic).process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process"));
+
+        final Properties properties = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
+                        mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
+                        mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                        mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000")
+                )
+        );
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            kafkaStreams.start();
+            produceMessages(0L, inputTopic);
+
+            latch.await(10, TimeUnit.SECONDS);

Review comment:
       should replace with a shutdown hook latch with counter instead

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1229,4 +1233,13 @@ boolean needsInitializationOrRestoration() {
     public void setPartitionResetter(final java.util.function.Consumer<Set<TopicPartition>> resetter) {
         this.resetter = resetter;
     }
+
+    public void flagForShutdownRequest(){
+        this.shutdownRequested.set(2);

Review comment:
       error code for INCOMPLETE_SOURCE_TOPIC_METADATA is 1, SHUTDOWN_REQUESTED is 2

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -566,10 +567,22 @@ void runLoop() {
                 }
             } catch (final TaskMigratedException e) {
                 handleTaskMigrated(e);
+            } catch (final ShutdownRequestedException e){
+                handleShutdownRequest(e);

Review comment:
       The interruption in this location caused ...
   
   
   ```
   TaskManager
   	MetadataState:
   	Tasks:
    died (org.apache.zookeeper.server.NIOServerCnxnFactory:92)
   java.lang.IllegalStateException: Illegal state SUSPENDED while completing restoration for active task 0_0
   	at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:245)
   	at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:487)
   	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:675)
   	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:554)
   	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:513)
   ```
   
   Not sure that is okay, the test still runs. If its is shutting down maybe it does matter and I'm not sure where else to put it

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -54,6 +54,9 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
             log.error("Received error code {}", assignmentErrorCode.get());
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        }else if(assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()){
+            //throw new ShutdownRequestedException("onPartition assigned"); //TODO: receive request and call requestClose()
+            //requestClose();

Review comment:
       until the shut down application form the stream thread functionality is added this will not shutdown the app




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org