You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/04 22:18:49 UTC
[kafka] branch trunk updated: KAFKA-6844: Call shutdown on
GlobalStreamThread after all StreamThreads have stopped (#4950)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f448e49 KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped (#4950)
f448e49 is described below
commit f448e49fbe8c3c609f9beffa2f421fc0b875aa3d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri May 4 18:18:44 2018 -0400
KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped (#4950)
Moved the shutdown of GlobalStreamThread to after all StreamThread instances have stopped.
There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first.
If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed "
Tested by running all current streams tests.
Reviewers: Ted Yu <yu...@gmail.com>, John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/KafkaStreams.java | 10 +-
.../integration/GlobalThreadShutDownOrderTest.java | 217 +++++++++++++++++++++
2 files changed, 223 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index eed12f1..56d031b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -876,10 +876,6 @@ public class KafkaStreams {
thread.setStateListener(null);
thread.shutdown();
}
- if (globalStreamThread != null) {
- globalStreamThread.setStateListener(null);
- globalStreamThread.shutdown();
- }
for (final StreamThread thread : threads) {
try {
@@ -890,6 +886,12 @@ public class KafkaStreams {
Thread.currentThread().interrupt();
}
}
+
+ if (globalStreamThread != null) {
+ globalStreamThread.setStateListener(null);
+ globalStreamThread.shutdown();
+ }
+
if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
try {
globalStreamThread.join();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
new file mode 100644
index 0000000..c7b63ad
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import kafka.utils.MockTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class GlobalThreadShutDownOrderTest {
+
+ private static final int NUM_BROKERS = 1;
+ private static final Properties BROKER_CONFIG;
+
+ static {
+ BROKER_CONFIG = new Properties();
+ BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
+ BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+ }
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER =
+ new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+ private final MockTime mockTime = CLUSTER.time;
+ private final String globalStore = "globalStore";
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String globalStoreTopic;
+ private String streamTopic;
+ private KStream<String, Long> stream;
+ private List<Long> retrievedValuesList = new ArrayList<>();
+ private boolean firstRecordProcessed;
+
+ @Before
+ public void before() throws InterruptedException {
+
+ builder = new StreamsBuilder();
+ createTopics();
+ streamsConfiguration = new Properties();
+ final String applicationId = "global-thread-shutdown-test";
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration
+ .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+
+ final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
+
+ KeyValueStoreBuilder<String, Long> storeBuilder = new KeyValueStoreBuilder<>(Stores.persistentKeyValueStore(globalStore),
+ Serdes.String(),
+ Serdes.Long(),
+ mockTime);
+
+ builder.addGlobalStore(storeBuilder,
+ globalStoreTopic,
+ Consumed.with(Serdes.String(), Serdes.Long()),
+ new MockProcessorSupplier());
+
+ stream = builder.stream(streamTopic, stringLongConsumed);
+
+ stream.process(new ProcessorSupplier<String, Long>() {
+ @Override
+ public Processor<String, Long> get() {
+ return new GlobalStoreProcessor(globalStore);
+ }
+ });
+
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+
+ @Test
+ public void shouldFinishGlobalStoreOperationOnShutDown() throws Exception {
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+ populateTopics(globalStoreTopic);
+ populateTopics(streamTopic);
+
+ kafkaStreams.start();
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return firstRecordProcessed;
+ }
+ }, 5000L, "Has not processed record within 5 seconds");
+
+ kafkaStreams.close(30, TimeUnit.SECONDS);
+
+ List<Long> expectedRetrievedValues = Arrays.asList(1L, 2L, 3L, 4L);
+ assertEquals(expectedRetrievedValues, retrievedValuesList);
+ }
+
+
+ private void createTopics() throws InterruptedException {
+ streamTopic = "stream-topic";
+ globalStoreTopic = "global-store-topic";
+ CLUSTER.createTopics(streamTopic);
+ CLUSTER.createTopic(globalStoreTopic);
+ }
+
+
+ private void populateTopics(String topicName) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ topicName,
+ Arrays.asList(
+ new KeyValue<>("A", 1L),
+ new KeyValue<>("B", 2L),
+ new KeyValue<>("C", 3L),
+ new KeyValue<>("D", 4L)),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ LongSerializer.class,
+ new Properties()),
+ mockTime);
+ }
+
+
+ private class GlobalStoreProcessor extends AbstractProcessor<String, Long> {
+
+ private KeyValueStore<String, Long> store;
+ private final String storeName;
+
+ GlobalStoreProcessor(final String storeName) {
+
+ this.storeName = storeName;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ store = (KeyValueStore<String, Long>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(final String key, final Long value) {
+ firstRecordProcessed = true;
+ }
+
+
+ @Override
+ public void close() {
+ List<String> keys = Arrays.asList("A", "B", "C", "D");
+ for (String key : keys) {
+ // need to simulate thread slow in closing
+ Utils.sleep(1000);
+ retrievedValuesList.add(store.get(key));
+ }
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.