You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/30 07:56:46 UTC

kafka git commit: KAFKA-5485; Streams should not suspend tasks twice

Repository: kafka
Updated Branches:
  refs/heads/trunk c4193cd1a -> 0dfeb31a1


KAFKA-5485; Streams should not suspend tasks twice

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3390 from mjsax/kafka-5485-dont-suspend-tasks-twice


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dfeb31a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dfeb31a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dfeb31a

Branch: refs/heads/trunk
Commit: 0dfeb31a116d26023540f6c6074127f0aab00b6d
Parents: c4193cd
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Jun 30 08:56:40 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jun 30 08:56:40 2017 +0100

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java |  68 ++---
 .../processor/internals/StreamThread.java       |   2 +-
 .../processor/internals/StreamThreadTest.java   | 266 +++++++++++++++----
 3 files changed, 254 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index dfb28f6..3fd4596 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -258,12 +258,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     @Override
     public void commit() {
-        commitImpl(true);
+        commit(true);
 
     }
 
     // visible for testing
-    void commitImpl(final boolean startNewTransaction) {
+    void commit(final boolean startNewTransaction) {
         log.debug("{} Committing", logPrefix);
         metrics.metrics.measureLatencyNs(
             time,
@@ -365,10 +365,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      *   - commit offsets
      * </pre>
      */
-    private void suspend(final boolean clean) {
+    // visible for testing
+    void suspend(final boolean clean) {
         closeTopology(); // should we call this only on clean suspend?
         if (clean) {
-            commitImpl(false);
+            commit(false);
         }
     }
 
@@ -396,33 +397,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
-    /**
-     * <pre>
-     * - {@link #suspend(boolean) suspend(clean)}
-     *   - close topology
-     *   - if (clean) {@link #commit()}
-     *     - flush state and producer
-     *     - commit offsets
-     * - close state
-     *   - if (clean) write checkpoint
-     * - if (eos) close producer
-     * </pre>
-     * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
-     *              otherwise, just close open resources
-     */
-    @Override
-    public void close(boolean clean) {
-        log.debug("{} Closing", logPrefix);
-
-        RuntimeException firstException = null;
-        try {
-            suspend(clean);
-        } catch (final RuntimeException e) {
-            clean = false;
-            firstException = e;
-            log.error("{} Could not close task: ", logPrefix, e);
-        }
-
+    // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed
+    void closeSuspended(boolean clean, RuntimeException firstException) {
         try {
             closeStateManager(clean);
         } catch (final RuntimeException e) {
@@ -459,6 +435,36 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+        /**
+         * <pre>
+         * - {@link #suspend(boolean) suspend(clean)}
+         *   - close topology
+         *   - if (clean) {@link #commit()}
+         *     - flush state and producer
+         *     - commit offsets
+         * - close state
+         *   - if (clean) write checkpoint
+         * - if (eos) close producer
+         * </pre>
+         * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
+         *              otherwise, just close open resources
+         */
+    @Override
+    public void close(boolean clean) {
+        log.debug("{} Closing", logPrefix);
+
+        RuntimeException firstException = null;
+        try {
+            suspend(clean);
+        } catch (final RuntimeException e) {
+            clean = false;
+            firstException = e;
+            log.error("{} Could not close task: ", logPrefix, e);
+        }
+
+        closeSuspended(clean, firstException);
+    }
+
     /**
      * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
      * and not added to the queue for processing

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ae344b0..f81773b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1228,7 +1228,7 @@ public class StreamThread extends Thread {
             if (!task.partitions().equals(assignedPartitionsForTask)) {
                 log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id());
                 try {
-                    task.close(true);
+                    task.closeSuspended(true, null);
                 } catch (final Exception e) {
                     log.error("{} Failed to remove suspended task {}: ", logPrefix, next.getKey(), e);
                 } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8205c27..5a31ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -168,6 +168,7 @@ public class StreamThreadTest {
 
     private static class TestStreamTask extends StreamTask {
         boolean committed = false;
+        private boolean suspended;
         private boolean closed;
         private boolean closedStateManager;
 
@@ -196,8 +197,8 @@ public class StreamThreadTest {
         }
 
         @Override
-        void commitImpl(final boolean startNewTransaction) {
-            super.commitImpl(startNewTransaction);
+        void commit(final boolean startNewTransaction) {
+            super.commit(startNewTransaction);
             committed = true;
         }
 
@@ -205,60 +206,66 @@ public class StreamThreadTest {
         protected void updateOffsetLimits() {}
 
         @Override
+        public void resume() {
+            if (!suspended || closed) {
+                throw new IllegalStateException("Should not resume task that is not suspended or already closed.");
+            }
+            super.resume();
+            suspended = false;
+        }
+
+        @Override
+        void suspend(final boolean clean) {
+            if (suspended || closed) {
+                throw new IllegalStateException("Should not suspend task that is already suspended or closed.");
+            }
+            super.suspend(clean);
+            suspended = true;
+        }
+
+        @Override
         public void close(final boolean clean) {
+            if (closed && clean) {
+                throw new IllegalStateException("Should not close task that is already closed.");
+            }
             super.close(clean);
             closed = true;
         }
 
         @Override
+        public void closeSuspended(final boolean clean, final RuntimeException firstException) {
+            if (closed && clean) {
+                throw new IllegalStateException("Should not close task that is not suspended or already closed.");
+            }
+            super.closeSuspended(clean, firstException);
+            closed = true;
+        }
+
+        @Override
         void closeStateManager(final boolean writeCheckpoint) {
             super.closeStateManager(writeCheckpoint);
             closedStateManager = true;
         }
     }
 
-
     @SuppressWarnings("unchecked")
     @Test
-    public void testPartitionAssignmentChange() throws Exception {
+    public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
         builder.addSource("source1", "topic1");
-        builder.addSource("source2", "topic2");
-        builder.addSource("source3", "topic3");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        final StreamThread thread = new StreamThread(
-            builder,
-            config,
-            clientSupplier,
-            applicationId,
-            clientId,
-            processId,
-            metrics,
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+        final StreamThread thread = getStreamThread();
 
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        thread.setPartitionAssignor(new StreamPartitionAssignor() {
             @Override
-            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
-                final ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(
-                    id,
-                    applicationId,
-                    partitionsForTask,
-                    topology,
-                    consumer,
-                    clientSupplier.getProducer(new HashMap()),
-                    restoreConsumer,
-                    config,
-                    new MockStreamsMetrics(new Metrics()),
-                    stateDirectory);
+            Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return activeTasks;
             }
-        };
+        });
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
-        initPartitionGrouper(config, thread, clientSupplier);
 
         final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -269,44 +276,56 @@ public class StreamThreadTest {
         Set<TopicPartition> expectedGroup1;
         Set<TopicPartition> expectedGroup2;
 
+        // revoke nothing
         revokedPartitions = Collections.emptyList();
-        assignedPartitions = Collections.singletonList(t1p1);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
         assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
         Assert.assertEquals(stateListener.numChanges, 1);
         Assert.assertEquals(stateListener.oldState, StreamThread.State.RUNNING);
         Assert.assertEquals(stateListener.newState, StreamThread.State.PARTITIONS_REVOKED);
+
+        // assign single partition
+        assignedPartitions = Collections.singletonList(t1p1);
+        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
+        activeTasks.put(new TaskId(0, 1), expectedGroup1);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
         assertEquals(thread.state(), StreamThread.State.RUNNING);
         Assert.assertEquals(stateListener.numChanges, 3);
         Assert.assertEquals(stateListener.oldState, StreamThread.State.ASSIGNING_PARTITIONS);
         Assert.assertEquals(stateListener.newState, StreamThread.State.RUNNING);
-
         assertTrue(thread.tasks().containsKey(task1));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
         assertEquals(1, thread.tasks().size());
 
+        // revoke single partition
         revokedPartitions = assignedPartitions;
-        assignedPartitions = Collections.singletonList(t1p2);
-        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
+        activeTasks.clear();
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
         assertFalse(thread.tasks().containsKey(task1));
         assertEquals(0, thread.tasks().size());
+
+        // assign different single partition
+        assignedPartitions = Collections.singletonList(t1p2);
+        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
+        activeTasks.put(new TaskId(0, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().containsKey(task2));
         assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
         assertEquals(1, thread.tasks().size());
 
+        // revoke different single partition and assign both partitions
         revokedPartitions = assignedPartitions;
+        activeTasks.clear();
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assignedPartitions = Arrays.asList(t1p1, t1p2);
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        activeTasks.put(new TaskId(0, 1), expectedGroup1);
+        activeTasks.put(new TaskId(0, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().containsKey(task1));
@@ -315,12 +334,65 @@ public class StreamThreadTest {
         assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
         assertEquals(2, thread.tasks().size());
 
+        // revoke all partitions and assign nothing
         revokedPartitions = assignedPartitions;
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        assignedPartitions = Collections.emptyList();
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().isEmpty());
+
+        thread.close();
+        assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) ||
+            (thread.state() == StreamThread.State.CREATED));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addSource("source3", "topic3");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
+
+        final StreamThread thread = getStreamThread();
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        thread.setPartitionAssignor(new StreamPartitionAssignor() {
+            @Override
+            Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return activeTasks;
+            }
+        });
+
+        final StateListenerStub stateListener = new StateListenerStub();
+        thread.setStateListener(stateListener);
+        assertEquals(thread.state(), StreamThread.State.RUNNING);
+
+        final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+
+        assertTrue(thread.tasks().isEmpty());
+
+        List<TopicPartition> revokedPartitions;
+        List<TopicPartition> assignedPartitions;
+        Set<TopicPartition> expectedGroup1;
+        Set<TopicPartition> expectedGroup2;
+
+        // revoke nothing
+        revokedPartitions = Collections.emptyList();
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
+        assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
+        Assert.assertEquals(stateListener.numChanges, 1);
+        Assert.assertEquals(stateListener.oldState, StreamThread.State.RUNNING);
+        Assert.assertEquals(stateListener.newState, StreamThread.State.PARTITIONS_REVOKED);
+
+        // assign four new partitions of second subtopology
         assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
         expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        activeTasks.put(new TaskId(1, 1), expectedGroup1);
+        activeTasks.put(new TaskId(1, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().containsKey(task4));
@@ -329,12 +401,15 @@ public class StreamThreadTest {
         assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
         assertEquals(2, thread.tasks().size());
 
+        // revoke four partitions and assign three partitions of both subtopologies
         revokedPartitions = assignedPartitions;
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
         assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        activeTasks.put(new TaskId(0, 1), expectedGroup1);
+        activeTasks.put(new TaskId(1, 1), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().containsKey(task1));
@@ -343,12 +418,12 @@ public class StreamThreadTest {
         assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
         assertEquals(2, thread.tasks().size());
 
+        // revoke all three partitons and reassign the same three partitions (from different subtopologies)
         revokedPartitions = assignedPartitions;
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().containsKey(task1));
@@ -357,10 +432,10 @@ public class StreamThreadTest {
         assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
         assertEquals(2, thread.tasks().size());
 
+        // revoke all partitions and assign nothing
         revokedPartitions = assignedPartitions;
-        assignedPartitions = Collections.emptyList();
-
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        assignedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertTrue(thread.tasks().isEmpty());
@@ -931,6 +1006,67 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldNotCloseSuspendedTaskswice() throws Exception {
+        builder.addSource("name", "topic").addSink("out", "output");
+
+        final TestStreamTask testStreamTask = new TestStreamTask(
+            new TaskId(0, 0),
+            applicationId,
+            Utils.mkSet(new TopicPartition("topic", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime));
+
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
+                return testStreamTask;
+            }
+        };
+
+        final Set<TopicPartition> activeTasks = new HashSet<>();
+        activeTasks.add(new TopicPartition("topic", 0));
+
+        thread.setPartitionAssignor(new StreamPartitionAssignor() {
+            @Override
+            Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return new HashMap<TaskId, Set<TopicPartition>>() {
+                    {
+                        put(new TaskId(0, 0), activeTasks);
+                    }
+                };
+            }
+        });
+
+        thread.rebalanceListener.onPartitionsAssigned(activeTasks);
+        thread.rebalanceListener.onPartitionsRevoked(activeTasks);
+
+        assertTrue(testStreamTask.suspended);
+        assertFalse(testStreamTask.closed);
+
+        activeTasks.clear();
+        // this should succeed without exception
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+        assertTrue(testStreamTask.closed);
+    }
+    @Test
+
     public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);
@@ -1624,4 +1760,34 @@ public class StreamThreadTest {
         }
     }
 
+    private StreamThread getStreamThread() {
+        return new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
+                final ProcessorTopology topology = builder.build(id.topicGroupId);
+                return new TestStreamTask(
+                    id,
+                    applicationId,
+                    partitionsForTask,
+                    topology,
+                    consumer,
+                    clientSupplier.getProducer(new HashMap()),
+                    restoreConsumer,
+                    config,
+                    new MockStreamsMetrics(new Metrics()),
+                    stateDirectory);
+            }
+        };
+    }
 }