You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 18:30:36 UTC

kafka git commit: KAFKA-4540: Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

Repository: kafka
Updated Branches:
  refs/heads/trunk 0321bf5aa -> 56c61745d


KAFKA-4540: Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

During `onPartitionsAssigned` first close, and remove, any suspended `StandbyTasks` that are no longer assigned to this consumer.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2266 from dguy/kafka-4540


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56c61745
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56c61745
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56c61745

Branch: refs/heads/trunk
Commit: 56c61745deeadd299e5ef24577c1bdf5d9d231a0
Parents: 0321bf5
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 20 10:30:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 10:30:33 2016 -0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       |  98 +++++-----
 .../processor/internals/StreamThreadTest.java   | 191 +++++++++++++++++--
 2 files changed, 225 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/56c61745/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8b7aaea..f09bf54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.LockException;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -233,6 +232,10 @@ public class StreamThread extends Thread {
                     StreamThread.this.getName(), assignment);
 
                 setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
+                // do this first as we may have suspended standby tasks that
+                // will become active or vice versa
+                closeNonAssignedSuspendedStandbyTasks();
+                closeNonAssignedSuspendedTasks();
                 addStreamTasks(assignment);
                 addStandbyTasks();
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
@@ -810,6 +813,47 @@ public class StreamThread extends Thread {
         return null;
     }
 
+    private void closeNonAssignedSuspendedTasks() {
+        final Map<TaskId, Set<TopicPartition>> newTaskAssignment = partitionAssignor.activeTasks();
+        final Iterator<Map.Entry<TaskId, StreamTask>> suspendedTaskIterator = suspendedTasks.entrySet().iterator();
+        while (suspendedTaskIterator.hasNext()) {
+            final Map.Entry<TaskId, StreamTask> next = suspendedTaskIterator.next();
+            final StreamTask task = next.getValue();
+            final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
+            if (!task.partitions().equals(assignedPartitionsForTask)) {
+                log.debug("{} closing suspended non-assigned task", logPrefix);
+                try {
+                    task.close();
+                    task.closeStateManager();
+                } catch (Exception e) {
+                    log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(), e);
+                } finally {
+                    suspendedTaskIterator.remove();
+                }
+            }
+        }
+
+    }
+
+    private void closeNonAssignedSuspendedStandbyTasks() {
+        final Set<TaskId> currentSuspendedTaskIds = partitionAssignor.standbyTasks().keySet();
+        final Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator = suspendedStandbyTasks.entrySet().iterator();
+        while (standByTaskIterator.hasNext()) {
+            final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
+            if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
+                log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, suspendedTask.getKey());
+                final StandbyTask task = suspendedTask.getValue();
+                try {
+                    task.close();
+                    task.closeStateManager();
+                } catch (Exception e) {
+                    log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e);
+                } finally {
+                    standByTaskIterator.remove();
+                }
+            }
+        }
+    }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
         if (partitionAssignor == null)
@@ -847,15 +891,12 @@ public class StreamThread extends Thread {
             }
         }
 
-        // destroy any remaining suspended tasks
-        removeSuspendedTasks();
-
         // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
         taskCreator.retryWithBackoff(newTasks);
     }
 
-    private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
+    StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions);
 
         sensors.taskCreationSensor.record();
@@ -894,9 +935,6 @@ public class StreamThread extends Thread {
             updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
         }
 
-        // destroy any remaining suspended tasks
-        removeSuspendedStandbyTasks();
-
         // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
         new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks);
@@ -959,40 +997,6 @@ public class StreamThread extends Thread {
         standbyRecords.clear();
     }
 
-    private void removeSuspendedTasks() {
-        log.info("{} Removing all suspended tasks [{}]", logPrefix, suspendedTasks.keySet());
-        try {
-            // Close task and state manager
-            for (final AbstractTask task : suspendedTasks.values()) {
-                task.close();
-                task.flushState();
-                task.closeStateManager();
-                // flush out any extra data sent during close
-                producer.flush();
-            }
-            suspendedTasks.clear();
-        } catch (Exception e) {
-            log.error("{} Failed to remove suspended tasks: ", logPrefix, e);
-        }
-    }
-
-    private void removeSuspendedStandbyTasks() {
-        log.info("{} Removing all suspended standby tasks [{}]", logPrefix, suspendedStandbyTasks.keySet());
-        try {
-            // Close task and state manager
-            for (final AbstractTask task : suspendedStandbyTasks.values()) {
-                task.close();
-                task.flushState();
-                task.closeStateManager();
-                // flush out any extra data sent during close
-                producer.flush();
-            }
-            suspendedStandbyTasks.clear();
-        } catch (Exception e) {
-            log.error("{} Failed to remove suspended tasks: ", logPrefix, e);
-        }
-    }
-
     private void closeAllTasks() {
         performOnAllTasks(new AbstractTaskAction() {
             @Override
@@ -1189,13 +1193,9 @@ public class StreamThread extends Thread {
                     try {
                         createTask(taskId, partitions);
                         it.remove();
-                    } catch (final ProcessorStateException e) {
-                        if (e.getCause() instanceof LockException) {
-                            // ignore and retry
-                            log.warn("Could not create task {}. Will retry.", taskId, e);
-                        } else {
-                            throw e;
-                        }
+                    } catch (final LockException e) {
+                        // ignore and retry
+                        log.warn("Could not create task {}. Will retry.", taskId, e);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/56c61745/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1e4f883..e8c10e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -55,8 +55,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -131,6 +133,8 @@ public class StreamThreadTest {
 
     private static class TestStreamTask extends StreamTask {
         public boolean committed = false;
+        private boolean closed;
+        private boolean closedStateManager;
 
         public TestStreamTask(TaskId id,
                               String applicationId,
@@ -160,6 +164,18 @@ public class StreamThreadTest {
         protected void initializeOffsetLimits() {
             // do nothing
         }
+
+        @Override
+        public void close() {
+            this.closed = true;
+            super.close();
+        }
+
+        @Override
+        void closeStateManager() {
+            super.closeStateManager();
+            this.closedStateManager = true;
+        }
     }
 
 
@@ -298,8 +314,8 @@ public class StreamThreadTest {
     }
 
     final static String TOPIC = "topic";
-    final Set<TopicPartition> assignmentThread1 = Collections.singleton(new TopicPartition(TOPIC, 0));
-    final Set<TopicPartition> assignmentThread2 = Collections.singleton(new TopicPartition(TOPIC, 1));
+    final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
+    final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
 
     @Test
     public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
@@ -318,16 +334,23 @@ public class StreamThreadTest {
 
         final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
         final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
-        thread1.partitionAssignor(new MockStreamsPartitionAssignor());
-        thread2.partitionAssignor(new MockStreamsPartitionAssignor());
+
+        final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
+        final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
+
+        final Map<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<>(task0);
+        final Map<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<>(task1);
+
+        thread1.partitionAssignor(new MockStreamsPartitionAssignor(thread1Assignment));
+        thread2.partitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment));
 
         // revoke (to get threads in correct state)
         thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
         thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
 
         // assign
-        thread1.rebalanceListener.onPartitionsAssigned(assignmentThread1);
-        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+        thread1.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread2.rebalanceListener.onPartitionsAssigned(task1Assignment);
 
         final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
         for (TaskId tid : thread1.tasks().keySet()) {
@@ -339,19 +362,26 @@ public class StreamThreadTest {
         }
 
         // revoke (task will be suspended)
-        thread1.rebalanceListener.onPartitionsRevoked(assignmentThread1);
-        thread2.rebalanceListener.onPartitionsRevoked(assignmentThread2);
+        thread1.rebalanceListener.onPartitionsRevoked(task0Assignment);
+        thread2.rebalanceListener.onPartitionsRevoked(task1Assignment);
+
 
         // assign reverted
+        thread1Assignment.clear();
+        thread1Assignment.putAll(task1);
+
+        thread2Assignment.clear();
+        thread2Assignment.putAll(task0);
+
         Thread runIt = new Thread(new Runnable() {
             @Override
             public void run() {
-                thread1.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+                thread1.rebalanceListener.onPartitionsAssigned(task1Assignment);
             }
         });
         runIt.start();
 
-        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+        thread2.rebalanceListener.onPartitionsAssigned(task0Assignment);
 
         runIt.join();
 
@@ -362,12 +392,16 @@ public class StreamThreadTest {
     }
 
     private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
+
+        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
+
+        public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
+            this.activeTaskAssignment = activeTaskAssignment;
+        }
+
         @Override
         Map<TaskId, Set<TopicPartition>> activeTasks() {
-            Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-            activeTasks.put(new TaskId(0, 0), assignmentThread1);
-            activeTasks.put(new TaskId(0, 1), assignmentThread2);
-            return activeTasks;
+            return activeTaskAssignment;
         }
     }
 
@@ -411,6 +445,8 @@ public class StreamThreadTest {
                 }
             };
 
+
+
             initPartitionGrouper(config, thread);
 
             ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -431,6 +467,11 @@ public class StreamThreadTest {
             //
             // Assign t1p1 and t1p2. This should create task1 & task2
             //
+            final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+            activeTasks.put(task1, Collections.singleton(t1p1));
+            activeTasks.put(task2, Collections.singleton(t1p2));
+            thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
             revokedPartitions = Collections.emptyList();
             assignedPartitions = Arrays.asList(t1p1, t1p2);
             prevTasks = new HashMap<>(thread.tasks());
@@ -463,6 +504,8 @@ public class StreamThreadTest {
             //
             // Revoke t1p1 and t1p2. This should remove task1 & task2
             //
+            activeTasks.clear();
+
             revokedPartitions = assignedPartitions;
             assignedPartitions = Collections.emptyList();
             prevTasks = new HashMap<>(thread.tasks());
@@ -605,7 +648,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps());
         final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId,
-                                               clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
 
         thread.partitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -667,7 +710,125 @@ public class StreamThreadTest {
 
         assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0),
                                                                      new TopicPartition("stream-thread-test-count-two-changelog", 0))));
+    }
+
+    @Test
+    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId("appId");
+        builder.stream("t1").groupByKey().count("count-one");
+        builder.stream("t2").groupByKey().count("count-two");
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
+                                         Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+                                                                                     0,
+                                                                                     null,
+                                                                                     new Node[0],
+                                                                                     new Node[0])));
+        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
+                                         Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
+                                                                                     0,
+                                                                                     null,
+                                                                                     new Node[0],
+                                                                                     new Node[0])));
+
+
+        final HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
+        offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
+        restoreConsumer.updateEndOffsets(offsets);
+        restoreConsumer.updateBeginningOffsets(offsets);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        final TopicPartition t1 = new TopicPartition("t1", 0);
+        standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final TopicPartition t2 = new TopicPartition("t2", 0);
+        activeTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
+
+        thread.partitionAssignor(new StreamPartitionAssignor() {
+            @Override
+            Map<TaskId, Set<TopicPartition>> standbyTasks() {
+                return standbyTasks;
+            }
+
+            @Override
+            Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return activeTasks;
+            }
+        });
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));
+
+        // swap the assignment around and make sure we don't get any exceptions
+        standbyTasks.clear();
+        activeTasks.clear();
+        standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
+        activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1));
+    }
+
+    @Test
+    public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId("appId");
+        builder.stream(Pattern.compile("t.*")).to("out");
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+                final ProcessorTopology topology = builder.build(id.topicGroupId);
+                final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+                createdTasks.put(partitions, task);
+                return task;
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final TopicPartition t1 = new TopicPartition("t1", 0);
+        final Set<TopicPartition> task00Partitions = new HashSet<>();
+        task00Partitions.add(t1);
+        final TaskId taskId = new TaskId(0, 0);
+        activeTasks.put(taskId, task00Partitions);
+
+        thread.partitionAssignor(new StreamPartitionAssignor() {
+            @Override
+            Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return activeTasks;
+            }
+        });
+
+        // should create task for id 0_0 with a single partition
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
+
+        final TestStreamTask firstTask = createdTasks.get(task00Partitions);
+        assertThat(firstTask.id(), is(taskId));
+
+        // update assignment for the task 0_0 so it now has 2 partitions
+        task00Partitions.add(new TopicPartition("t2", 0));
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
 
+        // should close the first task as the assignment has changed
+        assertTrue("task should have been closed as assignment has changed", firstTask.closed);
+        assertTrue("tasks state manager should have been closed as assignment has changed", firstTask.closedStateManager);
+        // should have created a new task for 00
+        assertThat(createdTasks.get(task00Partitions).id(), is(taskId));
     }
 
     private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {