You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/07/21 14:35:40 UTC
[kafka] branch trunk updated: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff7cbf264c KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)
ff7cbf264c is described below
commit ff7cbf264c0cb79ae56209ac381c51ef588ab97a
Author: James Hughes <jh...@confluent.io>
AuthorDate: Thu Jul 21 10:35:29 2022 -0400
KAFKA-14076: Fix issues with KafkaStreams.CloseOptions (#12408)
- used static memberId was incorrect
- need to remove all threads/members from the group
- need to use admit client correctly
Add test to verify fixes.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 69 ++++---
.../KafkaStreamsCloseOptionsIntegrationTest.java | 198 +++++++++++++++++++++
2 files changed, 231 insertions(+), 36 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 05d99dd172..02b576c186 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -75,13 +75,11 @@ import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
-
import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.function.BiConsumer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -102,6 +100,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -1350,10 +1349,10 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped.
*/
public void close() {
- close(Long.MAX_VALUE);
+ close(Long.MAX_VALUE, false);
}
- private Thread shutdownHelper(final boolean error) {
+ private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
stateDirCleaner.shutdownNow();
if (rocksDBMetricsRecordingService != null) {
rocksDBMetricsRecordingService.shutdownNow();
@@ -1385,6 +1384,10 @@ public class KafkaStreams implements AutoCloseable {
}
});
+ if (leaveGroup) {
+ processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));
+ }
+
log.info("Shutdown {} stream threads complete", numStreamThreads);
if (globalStreamThread != null) {
@@ -1419,7 +1422,7 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread");
}
- private boolean close(final long timeoutMs) {
+ private boolean close(final long timeoutMs, final boolean leaveGroup) {
if (state.hasCompletedShutdown()) {
log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state);
return true;
@@ -1444,7 +1447,8 @@ public class KafkaStreams implements AutoCloseable {
log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", state);
throw new StreamsException("Failed to shut down while in state " + state);
} else {
- final Thread shutdownThread = shutdownHelper(false);
+
+ final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
shutdownThread.setDaemon(true);
shutdownThread.start();
@@ -1463,7 +1467,7 @@ public class KafkaStreams implements AutoCloseable {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in " + state());
} else {
- final Thread shutdownThread = shutdownHelper(true);
+ final Thread shutdownThread = shutdownHelper(true, -1, false);
shutdownThread.setDaemon(true);
shutdownThread.start();
@@ -1491,7 +1495,7 @@ public class KafkaStreams implements AutoCloseable {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
- return close(timeoutMs);
+ return close(timeoutMs, false);
}
/**
@@ -1505,48 +1509,41 @@ public class KafkaStreams implements AutoCloseable {
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
+ Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
+ log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+ return close(timeoutMs, options.leaveGroup);
+ }
- final long startMs = time.milliseconds();
-
- final boolean closeStatus = close(timeoutMs);
-
- final Optional<String> groupInstanceId = clientSupplier
- .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId))
- .groupMetadata()
- .groupInstanceId();
-
- final long remainingTimeMs = Math.max(0, timeoutMs - (time.milliseconds() - startMs));
-
- if (options.leaveGroup && groupInstanceId.isPresent()) {
- log.debug("Sending leave group trigger to removing instance from consumer group");
- //removing instance from consumer group
-
- final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get());
-
- final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
+ private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
+ return thread -> {
+ final Optional<String> groupInstanceId = thread.getGroupInstanceID();
+ if (groupInstanceId.isPresent()) {
+ log.debug("Sending leave group trigger to removing instance from consumer group: {}.",
+ groupInstanceId.get());
+ final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get());
+ final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
- final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient
+ final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient
.removeMembersFromConsumerGroup(
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
- try {
- removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- log.error("Could not remove static member {} from consumer group {} due to a: {}", groupInstanceId.get(),
+ try {
+ removeMembersFromConsumerGroupResult.memberResult(memberToRemove)
+ .get(remainingTimeMs, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ log.error("Could not remove static member {} from consumer group {} due to a: {}",
+ groupInstanceId.get(),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);
+ }
}
- }
-
- log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
-
- return closeStatus;
+ };
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
new file mode 100644
index 0000000000..8d3cb8e879
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+ @Rule
+ public final TestName testName = new TestName();
+ private static MockTime mockTime;
+
+ @Rule
+ public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+ protected static final String INPUT_TOPIC = "inputTopic";
+ protected static final String OUTPUT_TOPIC = "outputTopic";
+
+ protected Properties streamsConfig;
+ protected static KafkaStreams streams;
+ protected static Admin adminClient;
+ protected Properties commonClientConfig;
+ private Properties producerConfig;
+ protected Properties resultConsumerConfig;
+
+ public static final EmbeddedKafkaCluster CLUSTER;
+
+ static {
+ final Properties brokerProps = new Properties();
+ brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+ CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+ }
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Before
+ public void before() throws Exception {
+ mockTime = CLUSTER.time;
+
+ final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+ commonClientConfig = new Properties();
+ commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+ streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ // In this test, we set the SESSION_TIMEOUT_MS_CONFIG high in order to show that the call to
+ // `close(CloseOptions)` can remove the application from the Consumder Groups successfully.
+ streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+ streamsConfig.putAll(commonClientConfig);
+
+ producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.putAll(commonClientConfig);
+
+ resultConsumerConfig = new Properties();
+ resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+ resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+ resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ resultConsumerConfig.putAll(commonClientConfig);
+
+ if (adminClient == null) {
+ adminClient = Admin.create(commonClientConfig);
+ }
+
+ CLUSTER.deleteAllTopicsAndWait(120_000L);
+ CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+ CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+ add10InputElements();
+ }
+
+ @After
+ public void after() throws Exception {
+ if (streams != null) {
+ streams.close(Duration.ofSeconds(30));
+ }
+ }
+
+ @Test
+ public void testCloseOptions() throws Exception {
+ final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+ streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+ // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+ streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+ // RUN
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(30));
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+ streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
+ waitForEmptyConsumerGroup(adminClient, appID, 0);
+ }
+
+ protected Topology setupTopologyWithoutIntermediateUserTopic() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+ input.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));
+ return builder.build();
+ }
+
+ private void add10InputElements() {
+ final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
+ KeyValue.pair(1L, "bbb"),
+ KeyValue.pair(0L, "ccc"),
+ KeyValue.pair(1L, "ddd"),
+ KeyValue.pair(0L, "eee"),
+ KeyValue.pair(1L, "fff"),
+ KeyValue.pair(0L, "ggg"),
+ KeyValue.pair(1L, "hhh"),
+ KeyValue.pair(0L, "iii"),
+ KeyValue.pair(1L, "jjj"));
+
+ for (final KeyValue<Long, String> record : records) {
+ mockTime.sleep(10);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds());
+ }
+ }
+}