You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/02 17:33:00 UTC

[jira] [Commented] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

    [ https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308405#comment-16308405 ] 

ASF GitHub Bot commented on KAFKA-6383:
---------------------------------------

guozhangwang closed pull request #4343: KAFKA-6383: complete shutdown for CREATED StreamThreads
URL: https://github.com/apache/kafka/pull/4343
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 696081d36c7..ff440cc49a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,23 +173,26 @@ public State state() {
     /**
      * Sets the state
      * @param newState New state
+     * @return The state prior to the call to setState, or null if the transition is invalid
      */
-    boolean setState(final State newState) {
-        final State oldState = state;
+    State setState(final State newState) {
+        final State oldState;
 
         synchronized (stateLock) {
+            oldState = state;
+
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.DEAD) {
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself will be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
@@ -209,7 +212,7 @@ boolean setState(final State newState) {
             stateListener.onChange(this, state, oldState);
         }
 
-        return true;
+        return oldState;
     }
 
     public boolean isRunningAndNotRebalancing() {
@@ -251,7 +254,7 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
 
             final long start = time.milliseconds();
             try {
-                if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
+                if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
                 taskManager.createTasks(assignment);
@@ -281,7 +284,7 @@ public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
                 taskManager.activeTaskIds(),
                 taskManager.standbyTaskIds());
 
-            if (streamThread.setState(State.PARTITIONS_REVOKED)) {
+            if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
@@ -714,7 +717,11 @@ public StreamThread(final Time time,
     @Override
     public void run() {
         log.info("Starting");
-        setState(State.RUNNING);
+        if (setState(State.RUNNING) == null) {
+            log.info("StreamThread already shutdown. Not running");
+            completeShutdown(true);
+            return;
+        }
         boolean cleanRun = false;
         try {
             runLoop();
@@ -1088,7 +1095,11 @@ private long computeLatency() {
      */
     public void shutdown() {
         log.info("Informed to shut down");
-        setState(State.PENDING_SHUTDOWN);
+        State oldState = setState(State.PENDING_SHUTDOWN);
+        if (oldState == State.CREATED) {
+            // Start so that we shutdown on the thread
+            this.start();
+        }
     }
 
     private void completeShutdown(final boolean cleanRun) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d70c8f393e4..bdc1c00153f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,7 +274,11 @@ void shutdown(final boolean clean) {
         standby.close(clean);
 
         // remove the changelog partitions from restore consumer
-        restoreConsumer.unsubscribe();
+        try {
+            restoreConsumer.unsubscribe();
+        } catch (final RuntimeException fatalException) {
+            firstException.compareAndSet(null, fatalException);
+        }
         taskCreator.close();
         standbyTaskCreator.close();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 8746c62a50f..a2084b02aea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -31,6 +32,7 @@
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
@@ -113,6 +115,28 @@ public void testStateCloseAfterCreate() {
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
+    @Test
+    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props), clientSupplier);
+        streams.close();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.NOT_RUNNING;
+            }
+        }, 10 * 1000, "Streams never stopped.");
+
+        // Ensure that any created clients are closed
+        assertTrue(clientSupplier.consumer.closed());
+        assertTrue(clientSupplier.restoreConsumer.closed());
+        for (MockProducer p : clientSupplier.producers) {
+            assertTrue(p.closed());
+        }
+    }
+
     @Test
     public void testStateThreadClose() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8bcd6fb4ed4..fca380fac2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -438,8 +438,13 @@ public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(mockTime,
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
                 config,
                 consumer,
                 consumer,
@@ -455,6 +460,40 @@ public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
         EasyMock.verify(taskManager);
     }
 
+    @Test
+    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        taskManager.shutdown(true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
+        thread.shutdown();
+        try {
+            thread.join(1000);
+        } catch (final InterruptedException e) {
+            fail("Join interrupted");
+        }
+        assertFalse(thread.isAlive());
+        EasyMock.verify(taskManager);
+    }
+
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6383
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6383
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Rohan Desai
>            Assignee: Rohan Desai
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads via StreamThread.create, which in turn creates a bunch of stuff (including a producer). These resources are cleaned up only when the thread exits. So if the thread was never started, then they are never cleaned up. StreamThread.shutdown should clean up if it sees that the thread has never been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)