You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/16 01:22:44 UTC
kafka git commit: KAFKA-4539: StreamThread is not correctly creating
StandbyTasks
Repository: kafka
Updated Branches:
refs/heads/trunk 233cd4b18 -> 056ed8660
KAFKA-4539: StreamThread is not correctly creating StandbyTasks
Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so `createStandbyTask` can return `null`. We need to check for this in `StandbyTaskCreator.createTask(...)`
Also, the checkpointed offsets for `StandbyTask`s are never loaded.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2255 from dguy/kafka-4539
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/056ed866
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/056ed866
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/056ed866
Branch: refs/heads/trunk
Commit: 056ed86600ebf884057ea24daafc9e3b1ab85b81
Parents: 233cd4b
Author: Damian Guy <da...@gmail.com>
Authored: Thu Dec 15 17:22:41 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 15 17:22:41 2016 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/MockConsumer.java | 5 +-
.../processor/internals/StreamThread.java | 55 +++++++-------
.../processor/internals/StreamThreadTest.java | 77 ++++++++++++++++++++
3 files changed, 108 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/056ed866/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index fdce064..a2ba480 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -221,7 +221,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
ensureNotClosed();
- return subscriptions.committed(partition);
+ if (subscriptions.isAssigned(partition)) {
+ return subscriptions.committed(partition);
+ }
+ return new OffsetAndMetadata(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/056ed866/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cfbd3a0..8b7aaea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -219,7 +219,6 @@ public class StreamThread extends Thread {
private ThreadCache cache;
private final TaskCreator taskCreator = new TaskCreator();
- private final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator();
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
@@ -892,18 +891,7 @@ public class StreamThread extends Thread {
newStandbyTasks.put(taskId, partitions);
}
- if (task != null) {
- standbyTasks.put(taskId, task);
- for (TopicPartition partition : partitions) {
- standbyTasksByPartition.put(partition, task);
- }
- // collect checked pointed offsets to position the restore consumer
- // this include all partitions from which we restore states
- for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
- standbyTasksByPartition.put(partition, task);
- }
- checkpointedOffsets.putAll(task.checkpointedOffsets());
- }
+ updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
}
// destroy any remaining suspended tasks
@@ -911,7 +899,7 @@ public class StreamThread extends Thread {
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
- standbyTaskCreator.retryWithBackoff(newStandbyTasks);
+ new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks);
restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
@@ -926,6 +914,21 @@ public class StreamThread extends Thread {
}
}
+ private void updateStandByTaskMaps(final Map<TopicPartition, Long> checkpointedOffsets, final TaskId taskId, final Set<TopicPartition> partitions, final StandbyTask task) {
+ if (task != null) {
+ standbyTasks.put(taskId, task);
+ for (TopicPartition partition : partitions) {
+ standbyTasksByPartition.put(partition, task);
+ }
+ // collect checked pointed offsets to position the restore consumer
+ // this include all partitions from which we restore states
+ for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
+ standbyTasksByPartition.put(partition, task);
+ }
+ checkpointedOffsets.putAll(task.checkpointedOffsets());
+ }
+ }
+
private void updateSuspendedTasks() {
log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet());
suspendedTasks.clear();
@@ -1209,11 +1212,11 @@ public class StreamThread extends Thread {
}
}
- abstract void createTask(final TaskId id, final Collection<TopicPartition> partitions);
+ abstract void createTask(final TaskId id, final Set<TopicPartition> partitions);
}
class TaskCreator extends AbstractTaskCreator {
- void createTask(final TaskId taskId, final Collection<TopicPartition> partitions) {
+ void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
log.debug("{} creating new task {}", logPrefix, taskId);
final StreamTask task = createStreamTask(taskId, partitions);
@@ -1226,20 +1229,16 @@ public class StreamThread extends Thread {
}
class StandbyTaskCreator extends AbstractTaskCreator {
- void createTask(final TaskId taskId, final Collection<TopicPartition> partitions) {
- log.debug("{} creating new standby task {}", logPrefix, taskId);
- final StandbyTask task = createStandbyTask(taskId, partitions);
+ private final Map<TopicPartition, Long> checkpointedOffsets;
- standbyTasks.put(taskId, task);
+ StandbyTaskCreator(final Map<TopicPartition, Long> checkpointedOffsets) {
+ this.checkpointedOffsets = checkpointedOffsets;
+ }
- for (TopicPartition partition : partitions) {
- standbyTasksByPartition.put(partition, task);
- }
- // collect checked pointed offsets to position the restore consumer
- // this include all partitions from which we restore states
- for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
- standbyTasksByPartition.put(partition, task);
- }
+ void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
+ log.debug("{} creating new standby task {}", logPrefix, taskId);
+ final StandbyTask task = createStandbyTask(taskId, partitions);
+ updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/056ed866/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 13678a2..1e4f883 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
@@ -30,6 +31,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -593,6 +595,81 @@ public class StreamThreadTest {
assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
}
+ @Test
+ public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.setApplicationId("appId")
+ .addSource("name", "topic")
+ .addSink("out", "output");
+
+
+ final StreamsConfig config = new StreamsConfig(configProps());
+ final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId,
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+
+ thread.partitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet(new TopicPartition("topic", 0)));
+ }
+ });
+
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+ }
+
+ @Test
+ public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId("appId");
+ builder.stream("t1").groupByKey().count("count-one");
+ builder.stream("t2").groupByKey().count("count-two");
+ final StreamsConfig config = new StreamsConfig(configProps());
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
+
+ final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+ restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
+ Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+ 0,
+ null,
+ new Node[0],
+ new Node[0])));
+ restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
+ Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
+ 0,
+ null,
+ new Node[0],
+ new Node[0])));
+
+ final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ final TopicPartition t1 = new TopicPartition("t1", 0);
+ standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+
+ thread.partitionAssignor(new StreamPartitionAssignor() {
+ @Override
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTasks;
+ }
+ });
+
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+ assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0))));
+
+ // assign an existing standby plus a new one
+ standbyTasks.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition("t2", 0)));
+ thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+ thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+ assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog", 0),
+ new TopicPartition("stream-thread-test-count-two-changelog", 0))));
+
+ }
+
private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();