You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/21 14:35:40 UTC

[kafka] branch trunk updated: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff7cbf264c KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)
ff7cbf264c is described below

commit ff7cbf264c0cb79ae56209ac381c51ef588ab97a
Author: James Hughes <jh...@confluent.io>
AuthorDate: Thu Jul 21 10:35:29 2022 -0400

    KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)
    
    - used static memberId was incorrect
    - need to remove all threads/members from the group
    - need to use admit client correctly
    
    Add test to verify fixes.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  69 ++++---
 .../KafkaStreamsCloseOptionsIntegrationTest.java   | 198 +++++++++++++++++++++
 2 files changed, 231 insertions(+), 36 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 05d99dd172..02b576c186 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -75,13 +75,11 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
-
 import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.function.BiConsumer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -102,6 +100,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -1350,10 +1349,10 @@ public class KafkaStreams implements AutoCloseable {
      * This will block until all threads have stopped.
      */
     public void close() {
-        close(Long.MAX_VALUE);
+        close(Long.MAX_VALUE, false);
     }
 
-    private Thread shutdownHelper(final boolean error) {
+    private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
         stateDirCleaner.shutdownNow();
         if (rocksDBMetricsRecordingService != null) {
             rocksDBMetricsRecordingService.shutdownNow();
@@ -1385,6 +1384,10 @@ public class KafkaStreams implements AutoCloseable {
                 }
             });
 
+            if (leaveGroup) {
+                processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));
+            }
+
             log.info("Shutdown {} stream threads complete", numStreamThreads);
 
             if (globalStreamThread != null) {
@@ -1419,7 +1422,7 @@ public class KafkaStreams implements AutoCloseable {
         }, clientId + "-CloseThread");
     }
 
-    private boolean close(final long timeoutMs) {
+    private boolean close(final long timeoutMs, final boolean leaveGroup) {
         if (state.hasCompletedShutdown()) {
             log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state);
             return true;
@@ -1444,7 +1447,8 @@ public class KafkaStreams implements AutoCloseable {
             log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", state);
             throw new StreamsException("Failed to shut down while in state " + state);
         } else {
-            final Thread shutdownThread = shutdownHelper(false);
+
+            final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
 
             shutdownThread.setDaemon(true);
             shutdownThread.start();
@@ -1463,7 +1467,7 @@ public class KafkaStreams implements AutoCloseable {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in " + state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true);
+            final Thread shutdownThread = shutdownHelper(true, -1, false);
 
             shutdownThread.setDaemon(true);
             shutdownThread.start();
@@ -1491,7 +1495,7 @@ public class KafkaStreams implements AutoCloseable {
 
         log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
 
-        return close(timeoutMs);
+        return close(timeoutMs, false);
     }
 
     /**
@@ -1505,48 +1509,41 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
      */
     public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
+        Objects.requireNonNull(options, "options cannot be null");
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
         final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+        return close(timeoutMs, options.leaveGroup);
+    }
 
-        final long startMs = time.milliseconds();
-
-        final boolean closeStatus = close(timeoutMs);
-
-        final Optional<String> groupInstanceId = clientSupplier
-                .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId))
-                .groupMetadata()
-                .groupInstanceId();
-
-        final long remainingTimeMs = Math.max(0, timeoutMs - (time.milliseconds() - startMs));
-
-        if (options.leaveGroup && groupInstanceId.isPresent()) {
-            log.debug("Sending leave group trigger to removing instance from consumer group");
-            //removing instance from consumer group
-
-            final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get());
-
-            final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
+    private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
+        return thread -> {
+            final Optional<String> groupInstanceId = thread.getGroupInstanceID();
+            if (groupInstanceId.isPresent()) {
+                log.debug("Sending leave group trigger to removing instance from consumer group: {}.",
+                    groupInstanceId.get());
+                final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get());
+                final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
 
-            final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient
+                final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient
                     .removeMembersFromConsumerGroup(
                         applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
                         new RemoveMembersFromConsumerGroupOptions(membersToRemove)
                     );
 
-            try {
-                removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS);
-            } catch (final Exception e) {
-                log.error("Could not remove static member {} from consumer group {} due to a: {}", groupInstanceId.get(),
+                try {
+                    removeMembersFromConsumerGroupResult.memberResult(memberToRemove)
+                        .get(remainingTimeMs, TimeUnit.MILLISECONDS);
+                } catch (final Exception e) {
+                    log.error("Could not remove static member {} from consumer group {} due to a: {}",
+                        groupInstanceId.get(),
                         applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
+                }
             }
-        }
-
-        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
-
-        return closeStatus;
+        };
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
new file mode 100644
index 0000000000..8d3cb8e879
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // In this test, we set the SESSION_TIMEOUT_MS_CONFIG high in order to show that the call to
+        // `close(CloseOptions)` can remove the application from the Consumder Groups successfully.
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120_000L);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(30));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
+        waitForEmptyConsumerGroup(adminClient, appID, 0);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        input.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));
+        return builder.build();
+    }
+
+    private void add10InputElements() {
+        final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
+            KeyValue.pair(1L, "bbb"),
+            KeyValue.pair(0L, "ccc"),
+            KeyValue.pair(1L, "ddd"),
+            KeyValue.pair(0L, "eee"),
+            KeyValue.pair(1L, "fff"),
+            KeyValue.pair(0L, "ggg"),
+            KeyValue.pair(1L, "hhh"),
+            KeyValue.pair(0L, "iii"),
+            KeyValue.pair(1L, "jjj"));
+
+        for (final KeyValue<Long, String> record : records) {
+            mockTime.sleep(10);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds());
+        }
+    }
+}