You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/20 18:30:36 UTC
kafka git commit: KAFKA-4540: Suspended tasks that are not assigned
to the StreamThread need to be closed before new active and standby tasks are
created
Repository: kafka
Updated Branches:
refs/heads/trunk 0321bf5aa -> 56c61745d
KAFKA-4540: Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created
During `onPartitionsAssigned` first close, and remove, any suspended `StandbyTasks` that are no longer assigned to this consumer.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2266 from dguy/kafka-4540
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56c61745
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56c61745
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56c61745
Branch: refs/heads/trunk
Commit: 56c61745deeadd299e5ef24577c1bdf5d9d231a0
Parents: 0321bf5
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 20 10:30:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 20 10:30:33 2016 -0800
----------------------------------------------------------------------
.../processor/internals/StreamThread.java | 98 +++++-----
.../processor/internals/StreamThreadTest.java | 191 +++++++++++++++++--
2 files changed, 225 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/56c61745/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8b7aaea..f09bf54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.LockException;
-import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -233,6 +232,10 @@ public class StreamThread extends Thread {
StreamThread.this.getName(), assignment);
setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
+ // do this first as we may have suspended standby tasks that
+ // will become active or vice versa
+ closeNonAssignedSuspendedStandbyTasks();
+ closeNonAssignedSuspendedTasks();
addStreamTasks(assignment);
addStandbyTasks();
lastCleanMs = time.milliseconds(); // start the cleaning cycle
@@ -810,6 +813,47 @@ public class StreamThread extends Thread {
return null;
}
+ private void closeNonAssignedSuspendedTasks() {
+ final Map<TaskId, Set<TopicPartition>> newTaskAssignment = partitionAssignor.activeTasks();
+ final Iterator<Map.Entry<TaskId, StreamTask>> suspendedTaskIterator = suspendedTasks.entrySet().iterator();
+ while (suspendedTaskIterator.hasNext()) {
+ final Map.Entry<TaskId, StreamTask> next = suspendedTaskIterator.next();
+ final StreamTask task = next.getValue();
+ final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
+ if (!task.partitions().equals(assignedPartitionsForTask)) {
+ log.debug("{} closing suspended non-assigned task", logPrefix);
+ try {
+ task.close();
+ task.closeStateManager();
+ } catch (Exception e) {
+ log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(), e);
+ } finally {
+ suspendedTaskIterator.remove();
+ }
+ }
+ }
+
+ }
+
+ private void closeNonAssignedSuspendedStandbyTasks() {
+ final Set<TaskId> currentSuspendedTaskIds = partitionAssignor.standbyTasks().keySet();
+ final Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator = suspendedStandbyTasks.entrySet().iterator();
+ while (standByTaskIterator.hasNext()) {
+ final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
+ if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
+ log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, suspendedTask.getKey());
+ final StandbyTask task = suspendedTask.getValue();
+ try {
+ task.close();
+ task.closeStateManager();
+ } catch (Exception e) {
+ log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e);
+ } finally {
+ standByTaskIterator.remove();
+ }
+ }
+ }
+ }
private void addStreamTasks(Collection<TopicPartition> assignment) {
if (partitionAssignor == null)
@@ -847,15 +891,12 @@ public class StreamThread extends Thread {
}
}
- // destroy any remaining suspended tasks
- removeSuspendedTasks();
-
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
taskCreator.retryWithBackoff(newTasks);
}
- private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
+ StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions);
sensors.taskCreationSensor.record();
@@ -894,9 +935,6 @@ public class StreamThread extends Thread {
updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
}
- // destroy any remaining suspended tasks
- removeSuspendedStandbyTasks();
-
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks);
@@ -959,40 +997,6 @@ public class StreamThread extends Thread {
standbyRecords.clear();
}
- private void removeSuspendedTasks() {
- log.info("{} Removing all suspended tasks [{}]", logPrefix, suspendedTasks.keySet());
- try {
- // Close task and state manager
- for (final AbstractTask task : suspendedTasks.values()) {
- task.close();
- task.flushState();
- task.closeStateManager();
- // flush out any extra data sent during close
- producer.flush();
- }
- suspendedTasks.clear();
- } catch (Exception e) {
- log.error("{} Failed to remove suspended tasks: ", logPrefix, e);
- }
- }
-
- private void removeSuspendedStandbyTasks() {
- log.info("{} Removing all suspended standby tasks [{}]", logPrefix, suspendedStandbyTasks.keySet());
- try {
- // Close task and state manager
- for (final AbstractTask task : suspendedStandbyTasks.values()) {
- task.close();
- task.flushState();
- task.closeStateManager();
- // flush out any extra data sent during close
- producer.flush();
- }
- suspendedStandbyTasks.clear();
- } catch (Exception e) {
- log.error("{} Failed to remove suspended tasks: ", logPrefix, e);
- }
- }
-
private void closeAllTasks() {
performOnAllTasks(new AbstractTaskAction() {
@Override
@@ -1189,13 +1193,9 @@ public class StreamThread extends Thread {
try {
createTask(taskId, partitions);
it.remove();
- } catch (final ProcessorStateException e) {
- if (e.getCause() instanceof LockException) {
- // ignore and retry
- log.warn("Could not create task {}. Will retry.", taskId, e);
- } else {
- throw e;
- }
+ } catch (final LockException e) {
+ // ignore and retry
+ log.warn("Could not create task {}. Will retry.", taskId, e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/56c61745/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1e4f883..e8c10e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -55,8 +55,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
@@ -131,6 +133,8 @@ public class StreamThreadTest {
private static class TestStreamTask extends StreamTask {
public boolean committed = false;
+ private boolean closed;
+ private boolean closedStateManager;
public TestStreamTask(TaskId id,
String applicationId,
@@ -160,6 +164,18 @@ public class StreamThreadTest {
protected void initializeOffsetLimits() {
// do nothing
}
+
+ @Override
+ public void close() {
+ this.closed = true;
+ super.close();
+ }
+
+ @Override
+ void closeStateManager() {
+ super.closeStateManager();
+ this.closedStateManager = true;
+ }
}
@@ -298,8 +314,8 @@ public class StreamThreadTest {
}
final static String TOPIC = "topic";
- final Set<TopicPartition> assignmentThread1 = Collections.singleton(new TopicPartition(TOPIC, 0));
- final Set<TopicPartition> assignmentThread2 = Collections.singleton(new TopicPartition(TOPIC, 1));
+ final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
+ final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
@Test
public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
@@ -318,16 +334,23 @@ public class StreamThreadTest {
final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
- thread1.partitionAssignor(new MockStreamsPartitionAssignor());
- thread2.partitionAssignor(new MockStreamsPartitionAssignor());
+
+ final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
+ final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
+
+ final Map<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<>(task0);
+ final Map<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<>(task1);
+
+ thread1.partitionAssignor(new MockStreamsPartitionAssignor(thread1Assignment));
+ thread2.partitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment));
// revoke (to get threads in correct state)
thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
// assign
- thread1.rebalanceListener.onPartitionsAssigned(assignmentThread1);
- thread2.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+ thread1.rebalanceListener.onPartitionsAssigned(task0Assignment);
+ thread2.rebalanceListener.onPartitionsAssigned(task1Assignment);
final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
for (TaskId tid : thread1.tasks().keySet()) {
@@ -339,19 +362,26 @@ public class StreamThreadTest {
}
// revoke (task will be suspended)
- thread1.rebalanceListener.onPartitionsRevoked(assignmentThread1);
- thread2.rebalanceListener.onPartitionsRevoked(assignmentThread2);
+ thread1.rebalanceListener.onPartitionsRevoked(task0Assignment);
+ thread2.rebalanceListener.onPartitionsRevoked(task1Assignment);
+
// assign reverted
+ thread1Assignment.clear();
+ thread1Assignment.putAll(task1);
+
+ thread2Assignment.clear();
+ thread2Assignment.putAll(task0);
+
Thread runIt = new Thread(new Runnable() {
@Override
public void run() {
- thread1.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+ thread1.rebalanceListener.onPartitionsAssigned(task1Assignment);
}
});
runIt.start();
- thread2.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+ thread2.rebalanceListener.onPartitionsAssigned(task0Assignment);
runIt.join();
@@ -362,12 +392,16 @@ public class StreamThreadTest {
}
private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
+
+ private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
+
+ public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
+ this.activeTaskAssignment = activeTaskAssignment;
+ }
+
@Override
Map<TaskId, Set<TopicPartition>> activeTasks() {
- Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
- activeTasks.put(new TaskId(0, 0), assignmentThread1);
- activeTasks.put(new TaskId(0, 1), assignmentThread2);
- return activeTasks;
+ return activeTaskAssignment;
}
}
@@ -411,6 +445,8 @@ public class StreamThreadTest {
}
};
+
+
initPartitionGrouper(config, thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -431,6 +467,11 @@ public class StreamThreadTest {
//
// Assign t1p1 and t1p2. This should create task1 & task2
//
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ activeTasks.put(task1, Collections.singleton(t1p1));
+ activeTasks.put(task2, Collections.singleton(t1p2));
+ thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
revokedPartitions = Collections.emptyList();
assignedPartitions = Arrays.asList(t1p1, t1p2);
prevTasks = new HashMap<>(thread.tasks());
@@ -463,6 +504,8 @@ public class StreamThreadTest {
//
// Revoke t1p1 and t1p2. This should remove task1 & task2
//
+ activeTasks.clear();
+
revokedPartitions = assignedPartitions;
assignedPartitions = Collections.emptyList();
prevTasks = new HashMap<>(thread.tasks());
@@ -605,7 +648,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps());
final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId,
- clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
thread.partitionAssignor(new StreamPartitionAssignor() {
@Override
@@ -667,7 +710,125 @@ public class StreamThreadTest {
assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0),
new TopicPartition("stream-thread-test-count-two-changelog", 0))));
+ }
+
+ @Test
+ public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId("appId");
+ builder.stream("t1").groupByKey().count("count-one");
+ builder.stream("t2").groupByKey().count("count-two");
+ final StreamsConfig config = new StreamsConfig(configProps());
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+ final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+ restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
+ Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+ 0,
+ null,
+ new Node[0],
+ new Node[0])));
+ restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
+ Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
+ 0,
+ null,
+ new Node[0],
+ new Node[0])));
+
+
+ final HashMap<TopicPartition, Long> offsets = new HashMap<>();
+ offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
+ offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
+ restoreConsumer.updateEndOffsets(offsets);
+ restoreConsumer.updateBeginningOffsets(offsets);
+
+ final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ final TopicPartition t1 = new TopicPartition("t1", 0);
+ standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ final TopicPartition t2 = new TopicPartition("t2", 0);
+ activeTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
+
+ thread.partitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTasks;
+ }
+
+ @Override
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ return activeTasks;
+ }
+ });
+
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));
+
+ // swap the assignment around and make sure we don't get any exceptions
+ standbyTasks.clear();
+ activeTasks.clear();
+ standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
+ activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1));
+ }
+
+ @Test
+ public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId("appId");
+ builder.stream(Pattern.compile("t.*")).to("out");
+ final StreamsConfig config = new StreamsConfig(configProps());
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
+
+ final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder)) {
+ @Override
+ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
+ final ProcessorTopology topology = builder.build(id.topicGroupId);
+ final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+ createdTasks.put(partitions, task);
+ return task;
+ }
+ };
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ final TopicPartition t1 = new TopicPartition("t1", 0);
+ final Set<TopicPartition> task00Partitions = new HashSet<>();
+ task00Partitions.add(t1);
+ final TaskId taskId = new TaskId(0, 0);
+ activeTasks.put(taskId, task00Partitions);
+
+ thread.partitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ return activeTasks;
+ }
+ });
+
+ // should create task for id 0_0 with a single partition
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
+
+ final TestStreamTask firstTask = createdTasks.get(task00Partitions);
+ assertThat(firstTask.id(), is(taskId));
+
+ // update assignment for the task 0_0 so it now has 2 partitions
+ task00Partitions.add(new TopicPartition("t2", 0));
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
+ // should close the first task as the assignment has changed
+ assertTrue("task should have been closed as assignment has changed", firstTask.closed);
+ assertTrue("tasks state manager should have been closed as assignment has changed", firstTask.closedStateManager);
+ // should have created a new task for 00
+ assertThat(createdTasks.get(task00Partitions).id(), is(taskId));
}
private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {