You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/04 22:18:49 UTC

[kafka] branch trunk updated: KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped (#4950)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f448e49  KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped (#4950)
f448e49 is described below

commit f448e49fbe8c3c609f9beffa2f421fc0b875aa3d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri May 4 18:18:44 2018 -0400

    KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped (#4950)
    
    Moved the shutdown of GlobalStreamThread to after all StreamThread instances have stopped.
    
    There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first.
    If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed "
    
    Tested by running all current streams tests.
    
    Reviewers: Ted Yu <yu...@gmail.com>, John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  10 +-
 .../integration/GlobalThreadShutDownOrderTest.java | 217 +++++++++++++++++++++
 2 files changed, 223 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index eed12f1..56d031b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -876,10 +876,6 @@ public class KafkaStreams {
                         thread.setStateListener(null);
                         thread.shutdown();
                     }
-                    if (globalStreamThread != null) {
-                        globalStreamThread.setStateListener(null);
-                        globalStreamThread.shutdown();
-                    }
 
                     for (final StreamThread thread : threads) {
                         try {
@@ -890,6 +886,12 @@ public class KafkaStreams {
                             Thread.currentThread().interrupt();
                         }
                     }
+
+                    if (globalStreamThread != null) {
+                        globalStreamThread.setStateListener(null);
+                        globalStreamThread.shutdown();
+                    }
+
                     if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
                         try {
                             globalStreamThread.join();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
new file mode 100644
index 0000000..c7b63ad
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import kafka.utils.MockTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class GlobalThreadShutDownOrderTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final Properties BROKER_CONFIG;
+
+    static {
+        BROKER_CONFIG = new Properties();
+        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
+        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+    }
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+    private final MockTime mockTime = CLUSTER.time;
+    private final String globalStore = "globalStore";
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String globalStoreTopic;
+    private String streamTopic;
+    private KStream<String, Long> stream;
+    private List<Long> retrievedValuesList = new ArrayList<>();
+    private boolean firstRecordProcessed;
+
+    @Before
+    public void before() throws InterruptedException {
+
+        builder = new StreamsBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        final String applicationId = "global-thread-shutdown-test";
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+
+        final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
+
+        KeyValueStoreBuilder<String, Long> storeBuilder = new KeyValueStoreBuilder<>(Stores.persistentKeyValueStore(globalStore),
+                                                                                     Serdes.String(),
+                                                                                     Serdes.Long(),
+                                                                                     mockTime);
+
+        builder.addGlobalStore(storeBuilder,
+                               globalStoreTopic,
+                               Consumed.with(Serdes.String(), Serdes.Long()),
+                               new MockProcessorSupplier());
+
+        stream = builder.stream(streamTopic, stringLongConsumed);
+
+        stream.process(new ProcessorSupplier<String, Long>() {
+            @Override
+            public Processor<String, Long> get() {
+                return new GlobalStoreProcessor(globalStore);
+            }
+        });
+
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+
+    @Test
+    public void shouldFinishGlobalStoreOperationOnShutDown() throws Exception {
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        populateTopics(globalStoreTopic);
+        populateTopics(streamTopic);
+
+        kafkaStreams.start();
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return firstRecordProcessed;
+            }
+        }, 5000L, "Has not processed record within 5 seconds");
+
+        kafkaStreams.close(30, TimeUnit.SECONDS);
+
+        List<Long> expectedRetrievedValues = Arrays.asList(1L, 2L, 3L, 4L);
+        assertEquals(expectedRetrievedValues, retrievedValuesList);
+    }
+
+
+    private void createTopics() throws InterruptedException {
+        streamTopic = "stream-topic";
+        globalStoreTopic = "global-store-topic";
+        CLUSTER.createTopics(streamTopic);
+        CLUSTER.createTopic(globalStoreTopic);
+    }
+
+
+    private void populateTopics(String topicName) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            topicName,
+            Arrays.asList(
+                new KeyValue<>("A", 1L),
+                new KeyValue<>("B", 2L),
+                new KeyValue<>("C", 3L),
+                new KeyValue<>("D", 4L)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                LongSerializer.class,
+                new Properties()),
+            mockTime);
+    }
+
+
+    private class GlobalStoreProcessor extends AbstractProcessor<String, Long> {
+
+        private KeyValueStore<String, Long> store;
+        private final String storeName;
+
+        GlobalStoreProcessor(final String storeName) {
+
+            this.storeName = storeName;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            store = (KeyValueStore<String, Long>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(final String key, final Long value) {
+            firstRecordProcessed = true;
+        }
+
+
+        @Override
+        public void close() {
+            List<String> keys = Arrays.asList("A", "B", "C", "D");
+            for (String key : keys) {
+                // need to simulate thread slow in closing
+                Utils.sleep(1000);
+                retrievedValuesList.add(store.get(key));
+            }
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.