You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/04 16:09:00 UTC

[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

    [ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464072#comment-16464072 ] 

ASF GitHub Bot commented on KAFKA-5697:
---------------------------------------

guozhangwang closed pull request #4930: KAFKA-5697: issue Consumer#wakeup during Streams shutdown
URL: https://github.com/apache/kafka/pull/4930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 66a8934d283..2c409d1015d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -67,6 +67,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -382,7 +383,13 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState
      * @return Map of all metrics.
      */
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(metrics.metrics());
+        final Map<MetricName, Metric> result = new LinkedHashMap<>();
+        for (final StreamThread thread : threads) {
+            result.putAll(thread.consumerMetrics());
+        }
+        if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
+        result.putAll(metrics.metrics());
+        return Collections.unmodifiableMap(result);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
new file mode 100644
index 00000000000..d404642793c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class ShutdownException extends StreamsException {
+    public ShutdownException(final String message) {
+        super(message);
+    }
+
+    public ShutdownException(final String message, final Throwable throwable) {
+        super(message, throwable);
+    }
+
+    public ShutdownException(final Throwable throwable) {
+        super(throwable);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
new file mode 100644
index 00000000000..8b912579b9a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class ConsumerUtils {
+    private ConsumerUtils() {}
+
+    public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> consumer, final long maxDurationMs) {
+        try {
+            return consumer.poll(maxDurationMs);
+        } catch (final WakeupException e) {
+            return new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<K, V>>>emptyMap());
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9fe5f..017f2da198f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,12 +23,14 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -46,6 +48,8 @@
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * This class is responsible for the initialization, restoration, closing, flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
@@ -60,13 +64,15 @@
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
+    private final IsRunning isRunning;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> globalConsumer,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener stateRestoreListener,
-                                  final StreamsConfig config) {
+                                  final StreamsConfig config,
+                                  final IsRunning isRunning) {
         super(stateDirectory.globalStateDir());
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
@@ -76,6 +82,11 @@ public GlobalStateManagerImpl(final LogContext logContext,
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.isRunning = isRunning;
+    }
+
+    public interface IsRunning {
+        boolean check();
     }
 
     @Override
@@ -200,6 +211,13 @@ public void register(final StateStore store,
             try {
                 partitionInfos = globalConsumer.partitionsFor(sourceTopic);
                 break;
+            } catch (final WakeupException wakeupException) {
+                if (isRunning.check()) {
+                    // note we may decide later that this condition is ok and just let the retry loop continue
+                    throw new IllegalStateException("Got unexpected WakeupException during initialization.", wakeupException);
+                } else {
+                    throw new ShutdownException("Shutting down from fetching partitions");
+                }
             } catch (final TimeoutException retryableException) {
                 if (++attempts > retries) {
                     log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
@@ -250,19 +268,20 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            BatchingStateRestoreCallback
-                stateRestoreAdapter =
-                (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
-                                                     BatchingStateRestoreCallback)
-                                                ? stateRestoreCallback
-                                                : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
+            final BatchingStateRestoreCallback stateRestoreAdapter =
+                (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof BatchingStateRestoreCallback)
+                    ? stateRestoreCallback
+                    : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
+                if (!isRunning.check()) {
+                    throw new ShutdownException("Streams is not running (any more)");
+                }
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
+                    final ConsumerRecords<byte[], byte[]> records = poll(globalConsumer, 100);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 1c348975f2a..4b6bfb1fd65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
@@ -27,6 +29,7 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -35,10 +38,12 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
 
@@ -103,6 +108,10 @@ public boolean isRunning() {
             return equals(RUNNING);
         }
 
+        public boolean isStarting() {
+            return equals(CREATED);
+        }
+
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
             final State tmpState = (State) newState;
@@ -170,6 +179,12 @@ public boolean stillRunning() {
         }
     }
 
+    private boolean stillStarting() {
+        synchronized (stateLock) {
+            return state.isStarting();
+        }
+    }
+
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -232,7 +247,7 @@ void initialize() {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
+                final ConsumerRecords<byte[], byte[]> received = poll(globalConsumer, pollMs);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -263,7 +278,19 @@ public void close() throws IOException {
 
     @Override
     public void run() {
-        final StateConsumer stateConsumer = initialize();
+        final StateConsumer stateConsumer;
+        try {
+            stateConsumer = initialize();
+        } catch (final ShutdownException e) {
+            log.info("Shutting down from initialization");
+            // Almost certainly, we arrived here because the state is already PENDING_SHUTDOWN, but it's harmless to
+            // just make sure
+            setState(State.PENDING_SHUTDOWN);
+            setState(State.DEAD);
+            streamsMetrics.removeAllThreadLevelSensors();
+            log.info("Shutdown complete");
+            return;
+        }
 
         if (stateConsumer == null) {
             // during initialization, the caller thread would wait for the state consumer
@@ -275,6 +302,7 @@ public void run() {
             setState(State.DEAD);
 
             log.warn("Error happened during initialization of the global state store; this thread has shutdown");
+            streamsMetrics.removeAllThreadLevelSensors();
 
             return;
         }
@@ -314,7 +342,14 @@ private StateConsumer initialize() {
                 globalConsumer,
                 stateDirectory,
                 stateRestoreListener,
-                config);
+                config,
+                new GlobalStateManagerImpl.IsRunning() {
+                    @Override
+                    public boolean check() {
+                        return stillStarting() || stillRunning();
+                    }
+                }
+            );
 
             final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
                 config,
@@ -367,5 +402,10 @@ public void shutdown() {
         // one could call shutdown() multiple times, so ignore subsequent calls
         // if already shutting down or dead
         setState(PENDING_SHUTDOWN);
+        globalConsumer.wakeup();
+    }
+
+    public Map<MetricName, Metric> consumerMetrics() {
+        return Collections.unmodifiableMap(globalConsumer.metrics());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76570e..9b67fc4263c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -40,6 +40,8 @@
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
@@ -81,7 +83,7 @@ public void register(final StateRestorer restorer) {
 
         final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
             for (final TopicPartition partition : restoringPartitions) {
                 restorePartition(allRecords, partition, active.restoringTaskFor(partition));
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cc5a07f0a91..32993f6b0be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,8 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -54,6 +56,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,6 +65,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 
 public class StreamThread extends Thread {
 
@@ -824,7 +828,7 @@ long runOnce(final long recordsProcessedBeforeCommit) {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = consumer.poll(pollTimeMs);
+            records = poll(consumer, pollTimeMs);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1051,7 +1055,7 @@ private void maybeUpdateStandbyTasks(final long now) {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+                final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 0);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : records.partitions()) {
@@ -1116,6 +1120,8 @@ private long computeLatency() {
     public void shutdown() {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
+        consumer.wakeup();
+        restoreConsumer.wakeup();
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for shutting down
             completeShutdown(true);
@@ -1210,4 +1216,13 @@ TaskManager taskManager() {
     Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
         return standbyRecords;
     }
+
+    public Map<MetricName, Metric> consumerMetrics() {
+        final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
+        final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        result.putAll(consumerMetrics);
+        result.putAll(restoreConsumerMetrics);
+        return result;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1dc8602c280..1c33f647ad4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -18,8 +18,6 @@
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
@@ -43,6 +41,7 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -232,7 +231,43 @@ public boolean conditionMet() {
 
         streams.close();
         assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+    }
+
+    @Ignore // this test cannot pass as long as GST blocks KS.start()
+    @Test
+    public void testGlobalThreadCloseWithoutConnectingToBroker() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        final StreamsBuilder builder = new StreamsBuilder();
+        // make sure we have the global state thread running too
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.start();
+        streams.close();
+        // There's nothing to assert... We're testing that this operation actually completes.
+    }
+
+    @Test
+    public void testLocalThreadCloseWithoutConnectingToBroker() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        // make sure we have the global state thread running too
+        builder.table("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.start();
+        streams.close();
+        // There's nothing to assert... We're testing that this operation actually completes.
     }
 
 
@@ -327,16 +362,6 @@ public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
         }
     }
 
-    @Test
-    public void testNumberDefaultMetrics() {
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final Map<MetricName, ? extends Metric> metrics = streams.metrics();
-        // all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
-        assertEquals(23, metrics.size());
-    }
-
     @Test
     public void testIllegalMetricsConfig() {
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 2d9c8c4bdc9..e8ee50764a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -55,6 +55,7 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -118,8 +119,9 @@ private Properties getTopicProperties(final String changelog) {
             final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
 
             for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
-                if (topicConfig.getKey().equals(changelog))
+                if (topicConfig.getKey().equals(changelog)) {
                     return topicConfig.getValue();
+                }
             }
 
             return new Properties();
@@ -157,6 +159,7 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
         //
         // Step 3: Verify the state changelog topics are compact
         //
+        waitForCompletion(streams, 2, 5000);
         streams.close();
 
         final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
@@ -201,6 +204,7 @@ public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Except
         //
         // Step 3: Verify the state changelog topics are compact
         //
+        waitForCompletion(streams, 2, 5000);
         streams.close();
         final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows"));
         final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e8cd59e0925..ec97f125ece 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -28,10 +28,12 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.test.TestCondition;
@@ -49,6 +51,8 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * Utility functions to make integration testing more convenient.
  */
@@ -158,6 +162,40 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration)
         produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
     }
 
+    /**
+     * Wait for streams to "finish", based on the consumer lag metric.
+     *
+     * Caveats:
+     * - Inputs must be finite, fully loaded, and flushed before this method is called
+     * - expectedPartitions is the total number of partitions to watch the lag on, including both input and internal.
+     *   It's somewhat ok to get this wrong, as the main failure case would be an immediate return due to the clients
+     *   not being initialized, which you can avoid with any non-zero value. But it's probably better to get it right ;)
+     */
+    public static void waitForCompletion(final KafkaStreams streams,
+                                         final int expectedPartitions,
+                                         final int timeoutMilliseconds) {
+        final long start = System.currentTimeMillis();
+        while (true) {
+            int lagMetrics = 0;
+            double totalLag = 0.0;
+            for (final Metric metric : streams.metrics().values()) {
+                if (metric.metricName().name().equals("records-lag")) {
+                    lagMetrics++;
+                    totalLag += ((Number) metric.metricValue()).doubleValue();
+                }
+            }
+            if (lagMetrics >= expectedPartitions && totalLag == 0.0) {
+                return;
+            }
+            if (System.currentTimeMillis() - start >= timeoutMilliseconds) {
+                throw new RuntimeException(String.format(
+                    "Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]",
+                    lagMetrics, expectedPartitions, totalLag
+                ));
+            }
+        }
+    }
+
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                   final String topic,
                                                                                   final int expectedNumRecords) throws InterruptedException {
@@ -352,7 +390,7 @@ public boolean conditionMet() {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            final ConsumerRecords<K, V> records = poll(consumer, pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index d956f278c50..9cd68572962 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -62,6 +62,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * Class that provides support for a series of benchmarks. It is usually driven by
  * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
@@ -334,7 +336,7 @@ private void consumeAndProduce(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -372,7 +374,7 @@ private void consume(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index d19e63e0543..7935271cdf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -79,6 +79,14 @@
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private final TopicPartition t3 = new TopicPartition("t3", 1);
     private final TopicPartition t4 = new TopicPartition("t4", 1);
+
+    private final GlobalStateManagerImpl.IsRunning alwaysRunning = new GlobalStateManagerImpl.IsRunning() {
+        @Override
+        public boolean check() {
+            return true;
+        }
+    };
+
     private GlobalStateManagerImpl stateManager;
     private StateDirectory stateDirectory;
     private StreamsConfig streamsConfig;
@@ -119,7 +127,8 @@ public void before() throws IOException {
             consumer,
             stateDirectory,
             stateRestoreListener,
-            streamsConfig);
+            streamsConfig,
+            alwaysRunning);
         processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
         stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -496,12 +505,20 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
 
     @Test
     public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
-        stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(streamsConfig, time) {
-            @Override
-            public boolean lockGlobalState() throws IOException {
-                throw new IOException("KABOOM!");
-            }
-        }, stateRestoreListener, streamsConfig);
+        stateManager = new GlobalStateManagerImpl(
+            new LogContext("mock"),
+            topology,
+            consumer,
+            new StateDirectory(streamsConfig, time) {
+                @Override
+                public boolean lockGlobalState() throws IOException {
+                    throw new IOException("KABOOM!");
+                }
+            },
+            stateRestoreListener,
+            streamsConfig,
+            alwaysRunning
+        );
 
         try {
             stateManager.initialize();
@@ -538,7 +555,8 @@ public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                alwaysRunning);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
@@ -571,7 +589,8 @@ public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                alwaysRunning);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 605ab337983..54ea1ce7329 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -63,6 +63,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -189,7 +190,7 @@ public void testUpdate() throws IOException {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+        task.update(partition2, poll(restoreStateConsumer, 100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
@@ -246,7 +247,7 @@ public void testUpdateKTable() throws IOException {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, poll(restoreStateConsumer, 100).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index e897088beca..c420d9878a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -42,6 +42,8 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class BrokerCompatibilityTest {
 
     private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
@@ -153,7 +155,7 @@ private static void loopUntilRecordReceived(final String kafka, final boolean eo
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = consumer.poll(100);
+                final ConsumerRecords<String, String> records = poll(consumer, 100);
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && record.value().equals("1")) {
                         return;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 752cdd696ed..513d592fa38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -52,6 +52,8 @@
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class EosTestDriver extends SmokeTestUtil {
 
     private static final int MAX_NUMBER_OF_KEYS = 100;
@@ -254,7 +256,7 @@ private static void ensureStreamsApplicationDown(final String kafka) {
                 topics.add("repartition");
             }
             consumer.subscribe(topics);
-            consumer.poll(0);
+            poll(consumer, 0);
 
             final Set<TopicPartition> partitions = new HashSet<>();
             for (final String topic : topics) {
@@ -284,7 +286,7 @@ private static void ensureStreamsApplicationDown(final String kafka) {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = poll(consumer, 100);
 
             for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -591,7 +593,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> records = poll(consumer, 100);
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 50330a08e61..74eac3f156c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -47,6 +47,8 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class SmokeTestDriver extends SmokeTestUtil {
 
     public static final int MAX_RECORD_EMPTY_RETRIES = 60;
@@ -289,7 +291,7 @@ public static void verify(String kafka, Map<String, Set<Integer>> allData, int m
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+            ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index ad19f32fd1d..49f699ec658 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -40,6 +40,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -74,7 +75,7 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(3, records.count());
     }
 
@@ -90,7 +91,7 @@ public void testResetToSpecificOffsetWhenBeforeBeginningOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -106,7 +107,7 @@ public void testResetToSpecificOffsetWhenAfterEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -122,7 +123,7 @@ public void testShiftOffsetByWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -138,7 +139,7 @@ public void testShiftOffsetByWhenBeforeBeginningOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(5, records.count());
     }
 
@@ -154,7 +155,7 @@ public void testShiftOffsetByWhenAfterEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -172,7 +173,7 @@ public void testResetUsingPlanWhenBetweenBeginningAndEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -190,7 +191,7 @@ public void testResetUsingPlanWhenBeforeBeginningOffset() {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -208,7 +209,7 @@ public void testResetUsingPlanWhenAfterEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -226,7 +227,7 @@ public void shouldSeekToEndOffset() {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7343d59e013..f6bbc4b5c27 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -309,7 +309,13 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                new GlobalStateManagerImpl.IsRunning() {
+                    @Override
+                    public boolean check() {
+                        return true;
+                    }
+                });
 
             final GlobalProcessorContextImpl globalProcessorContext
                 = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-5697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5697
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: John Roesler
>            Priority: Major
>              Labels: newbie
>             Fix For: 2.0.0
>
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, hoping the stream thread may eventually check it and shutdown itself. However, under certain scenarios the thread may get blocked within a single loop and hence will never check on this state enum. For example, it's {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block until the coordinator can be found. If the coordinator broker is never up and running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without starting the ZK / Kafka broker, and then it will get stuck in a single loop and even `ctrl-C` will not stop it since its set state will never be read by the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)