You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/30 07:56:46 UTC
kafka git commit: KAFKA-5485; Streams should not suspend tasks twice
Repository: kafka
Updated Branches:
refs/heads/trunk c4193cd1a -> 0dfeb31a1
KAFKA-5485; Streams should not suspend tasks twice
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #3390 from mjsax/kafka-5485-dont-suspend-tasks-twice
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dfeb31a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dfeb31a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dfeb31a
Branch: refs/heads/trunk
Commit: 0dfeb31a116d26023540f6c6074127f0aab00b6d
Parents: c4193cd
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Jun 30 08:56:40 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jun 30 08:56:40 2017 +0100
----------------------------------------------------------------------
.../streams/processor/internals/StreamTask.java | 68 ++---
.../processor/internals/StreamThread.java | 2 +-
.../processor/internals/StreamThreadTest.java | 266 +++++++++++++++----
3 files changed, 254 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index dfb28f6..3fd4596 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -258,12 +258,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*/
@Override
public void commit() {
- commitImpl(true);
+ commit(true);
}
// visible for testing
- void commitImpl(final boolean startNewTransaction) {
+ void commit(final boolean startNewTransaction) {
log.debug("{} Committing", logPrefix);
metrics.metrics.measureLatencyNs(
time,
@@ -365,10 +365,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* - commit offsets
* </pre>
*/
- private void suspend(final boolean clean) {
+ // visible for testing
+ void suspend(final boolean clean) {
closeTopology(); // should we call this only on clean suspend?
if (clean) {
- commitImpl(false);
+ commit(false);
}
}
@@ -396,33 +397,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
- /**
- * <pre>
- * - {@link #suspend(boolean) suspend(clean)}
- * - close topology
- * - if (clean) {@link #commit()}
- * - flush state and producer
- * - commit offsets
- * - close state
- * - if (clean) write checkpoint
- * - if (eos) close producer
- * </pre>
- * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
- * otherwise, just close open resources
- */
- @Override
- public void close(boolean clean) {
- log.debug("{} Closing", logPrefix);
-
- RuntimeException firstException = null;
- try {
- suspend(clean);
- } catch (final RuntimeException e) {
- clean = false;
- firstException = e;
- log.error("{} Could not close task: ", logPrefix, e);
- }
-
+ // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed
+ void closeSuspended(boolean clean, RuntimeException firstException) {
try {
closeStateManager(clean);
} catch (final RuntimeException e) {
@@ -459,6 +435,36 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
+ /**
+ * <pre>
+ * - {@link #suspend(boolean) suspend(clean)}
+ * - close topology
+ * - if (clean) {@link #commit()}
+ * - flush state and producer
+ * - commit offsets
+ * - close state
+ * - if (clean) write checkpoint
+ * - if (eos) close producer
+ * </pre>
+ * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
+ * otherwise, just close open resources
+ */
+ @Override
+ public void close(boolean clean) {
+ log.debug("{} Closing", logPrefix);
+
+ RuntimeException firstException = null;
+ try {
+ suspend(clean);
+ } catch (final RuntimeException e) {
+ clean = false;
+ firstException = e;
+ log.error("{} Could not close task: ", logPrefix, e);
+ }
+
+ closeSuspended(clean, firstException);
+ }
+
/**
* Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
* and not added to the queue for processing
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ae344b0..f81773b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1228,7 +1228,7 @@ public class StreamThread extends Thread {
if (!task.partitions().equals(assignedPartitionsForTask)) {
log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id());
try {
- task.close(true);
+ task.closeSuspended(true, null);
} catch (final Exception e) {
log.error("{} Failed to remove suspended task {}: ", logPrefix, next.getKey(), e);
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dfeb31a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8205c27..5a31ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -168,6 +168,7 @@ public class StreamThreadTest {
private static class TestStreamTask extends StreamTask {
boolean committed = false;
+ private boolean suspended;
private boolean closed;
private boolean closedStateManager;
@@ -196,8 +197,8 @@ public class StreamThreadTest {
}
@Override
- void commitImpl(final boolean startNewTransaction) {
- super.commitImpl(startNewTransaction);
+ void commit(final boolean startNewTransaction) {
+ super.commit(startNewTransaction);
committed = true;
}
@@ -205,60 +206,66 @@ public class StreamThreadTest {
protected void updateOffsetLimits() {}
@Override
+ public void resume() {
+ if (!suspended || closed) {
+ throw new IllegalStateException("Should not resume task that is not suspended or already closed.");
+ }
+ super.resume();
+ suspended = false;
+ }
+
+ @Override
+ void suspend(final boolean clean) {
+ if (suspended || closed) {
+ throw new IllegalStateException("Should not suspend task that is already suspended or closed.");
+ }
+ super.suspend(clean);
+ suspended = true;
+ }
+
+ @Override
public void close(final boolean clean) {
+ if (closed && clean) {
+ throw new IllegalStateException("Should not close task that is already closed.");
+ }
super.close(clean);
closed = true;
}
@Override
+ public void closeSuspended(final boolean clean, final RuntimeException firstException) {
+ if (closed && clean) {
+ throw new IllegalStateException("Should not close task that is not suspended or already closed.");
+ }
+ super.closeSuspended(clean, firstException);
+ closed = true;
+ }
+
+ @Override
void closeStateManager(final boolean writeCheckpoint) {
super.closeStateManager(writeCheckpoint);
closedStateManager = true;
}
}
-
@SuppressWarnings("unchecked")
@Test
- public void testPartitionAssignmentChange() throws Exception {
+ public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addSource("source3", "topic3");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
- final StreamThread thread = new StreamThread(
- builder,
- config,
- clientSupplier,
- applicationId,
- clientId,
- processId,
- metrics,
- Time.SYSTEM,
- new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
- 0) {
+ final StreamThread thread = getStreamThread();
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ thread.setPartitionAssignor(new StreamPartitionAssignor() {
@Override
- protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
- final ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(
- id,
- applicationId,
- partitionsForTask,
- topology,
- consumer,
- clientSupplier.getProducer(new HashMap()),
- restoreConsumer,
- config,
- new MockStreamsMetrics(new Metrics()),
- stateDirectory);
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ return activeTasks;
}
- };
+ });
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
assertEquals(thread.state(), StreamThread.State.RUNNING);
- initPartitionGrouper(config, thread, clientSupplier);
final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -269,44 +276,56 @@ public class StreamThreadTest {
Set<TopicPartition> expectedGroup1;
Set<TopicPartition> expectedGroup2;
+ // revoke nothing
revokedPartitions = Collections.emptyList();
- assignedPartitions = Collections.singletonList(t1p1);
- expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-
rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
Assert.assertEquals(stateListener.numChanges, 1);
Assert.assertEquals(stateListener.oldState, StreamThread.State.RUNNING);
Assert.assertEquals(stateListener.newState, StreamThread.State.PARTITIONS_REVOKED);
+
+ // assign single partition
+ assignedPartitions = Collections.singletonList(t1p1);
+ expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
+ activeTasks.put(new TaskId(0, 1), expectedGroup1);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
assertEquals(thread.state(), StreamThread.State.RUNNING);
Assert.assertEquals(stateListener.numChanges, 3);
Assert.assertEquals(stateListener.oldState, StreamThread.State.ASSIGNING_PARTITIONS);
Assert.assertEquals(stateListener.newState, StreamThread.State.RUNNING);
-
assertTrue(thread.tasks().containsKey(task1));
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
assertEquals(1, thread.tasks().size());
+ // revoke single partition
revokedPartitions = assignedPartitions;
- assignedPartitions = Collections.singletonList(t1p2);
- expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
+ activeTasks.clear();
rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
assertFalse(thread.tasks().containsKey(task1));
assertEquals(0, thread.tasks().size());
+
+ // assign different single partition
+ assignedPartitions = Collections.singletonList(t1p2);
+ expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
+ activeTasks.put(new TaskId(0, 2), expectedGroup2);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task2));
assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
assertEquals(1, thread.tasks().size());
+ // revoke different single partition and assign both partitions
revokedPartitions = assignedPartitions;
+ activeTasks.clear();
+ rebalanceListener.onPartitionsRevoked(revokedPartitions);
assignedPartitions = Arrays.asList(t1p1, t1p2);
expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
+ activeTasks.put(new TaskId(0, 1), expectedGroup1);
+ activeTasks.put(new TaskId(0, 2), expectedGroup2);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
@@ -315,12 +334,65 @@ public class StreamThreadTest {
assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
assertEquals(2, thread.tasks().size());
+ // revoke all partitions and assign nothing
revokedPartitions = assignedPartitions;
+ rebalanceListener.onPartitionsRevoked(revokedPartitions);
+ assignedPartitions = Collections.emptyList();
+ rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ assertTrue(thread.tasks().isEmpty());
+
+ thread.close();
+ assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) ||
+ (thread.state() == StreamThread.State.CREATED));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addSource("source3", "topic3");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
+
+ final StreamThread thread = getStreamThread();
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ thread.setPartitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ return activeTasks;
+ }
+ });
+
+ final StateListenerStub stateListener = new StateListenerStub();
+ thread.setStateListener(stateListener);
+ assertEquals(thread.state(), StreamThread.State.RUNNING);
+
+ final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+
+ assertTrue(thread.tasks().isEmpty());
+
+ List<TopicPartition> revokedPartitions;
+ List<TopicPartition> assignedPartitions;
+ Set<TopicPartition> expectedGroup1;
+ Set<TopicPartition> expectedGroup2;
+
+ // revoke nothing
+ revokedPartitions = Collections.emptyList();
+ rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
+ assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
+ Assert.assertEquals(stateListener.numChanges, 1);
+ Assert.assertEquals(stateListener.oldState, StreamThread.State.RUNNING);
+ Assert.assertEquals(stateListener.newState, StreamThread.State.PARTITIONS_REVOKED);
+
+ // assign four new partitions of second subtopology
assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
+ activeTasks.put(new TaskId(1, 1), expectedGroup1);
+ activeTasks.put(new TaskId(1, 2), expectedGroup2);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task4));
@@ -329,12 +401,15 @@ public class StreamThreadTest {
assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
assertEquals(2, thread.tasks().size());
+ // revoke four partitions and assign three partitions of both subtopologies
revokedPartitions = assignedPartitions;
+ rebalanceListener.onPartitionsRevoked(revokedPartitions);
+
assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
+ activeTasks.put(new TaskId(0, 1), expectedGroup1);
+ activeTasks.put(new TaskId(1, 1), expectedGroup2);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
@@ -343,12 +418,12 @@ public class StreamThreadTest {
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
assertEquals(2, thread.tasks().size());
+ // revoke all three partitons and reassign the same three partitions (from different subtopologies)
revokedPartitions = assignedPartitions;
+ rebalanceListener.onPartitionsRevoked(revokedPartitions);
assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-
- rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
@@ -357,10 +432,10 @@ public class StreamThreadTest {
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
assertEquals(2, thread.tasks().size());
+ // revoke all partitions and assign nothing
revokedPartitions = assignedPartitions;
- assignedPartitions = Collections.emptyList();
-
rebalanceListener.onPartitionsRevoked(revokedPartitions);
+ assignedPartitions = Collections.emptyList();
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().isEmpty());
@@ -931,6 +1006,67 @@ public class StreamThreadTest {
}
@Test
+ public void shouldNotCloseSuspendedTaskswice() throws Exception {
+ builder.addSource("name", "topic").addSink("out", "output");
+
+ final TestStreamTask testStreamTask = new TestStreamTask(
+ new TaskId(0, 0),
+ applicationId,
+ Utils.mkSet(new TopicPartition("topic", 0)),
+ builder.build(0),
+ clientSupplier.consumer,
+ clientSupplier.getProducer(new HashMap<String, Object>()),
+ clientSupplier.restoreConsumer,
+ config,
+ new MockStreamsMetrics(new Metrics()),
+ new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime));
+
+ final StreamThread thread = new StreamThread(
+ builder,
+ config,
+ clientSupplier,
+ applicationId,
+ clientId,
+ processId,
+ metrics,
+ mockTime,
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+ 0) {
+
+ @Override
+ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
+ return testStreamTask;
+ }
+ };
+
+ final Set<TopicPartition> activeTasks = new HashSet<>();
+ activeTasks.add(new TopicPartition("topic", 0));
+
+ thread.setPartitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ return new HashMap<TaskId, Set<TopicPartition>>() {
+ {
+ put(new TaskId(0, 0), activeTasks);
+ }
+ };
+ }
+ });
+
+ thread.rebalanceListener.onPartitionsAssigned(activeTasks);
+ thread.rebalanceListener.onPartitionsRevoked(activeTasks);
+
+ assertTrue(testStreamTask.suspended);
+ assertFalse(testStreamTask.closed);
+
+ activeTasks.clear();
+ // this should succeed without exception
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+ assertTrue(testStreamTask.closed);
+ }
+ @Test
+
public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
builder.setApplicationId(applicationId);
@@ -1624,4 +1760,34 @@ public class StreamThreadTest {
}
}
+ private StreamThread getStreamThread() {
+ return new StreamThread(
+ builder,
+ config,
+ clientSupplier,
+ applicationId,
+ clientId,
+ processId,
+ metrics,
+ Time.SYSTEM,
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+ 0) {
+
+ @Override
+ protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
+ final ProcessorTopology topology = builder.build(id.topicGroupId);
+ return new TestStreamTask(
+ id,
+ applicationId,
+ partitionsForTask,
+ topology,
+ consumer,
+ clientSupplier.getProducer(new HashMap()),
+ restoreConsumer,
+ config,
+ new MockStreamsMetrics(new Metrics()),
+ stateDirectory);
+ }
+ };
+ }
}