You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/23 20:32:46 UTC
[kafka] branch trunk updated: MINOR: Refactor code for restoring
tasks (#5768)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0ed389 MINOR: Refactor code for restoring tasks (#5768)
d0ed389 is described below
commit d0ed3894d6c586a580504c5141b3ae7de18f3518
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Nov 23 12:32:37 2018 -0800
MINOR: Refactor code for restoring tasks (#5768)
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../processor/internals/AssignedStreamsTasks.java | 98 ++++++++++++++++++++++
.../streams/processor/internals/AssignedTasks.java | 77 ++---------------
.../processor/internals/RestoringTasks.java | 2 +
.../processor/internals/StandbyTaskTest.java | 43 +++++-----
4 files changed, 133 insertions(+), 87 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 1eb3ab9..697482c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -22,11 +22,21 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
+ private final Map<TaskId, StreamTask> restoring = new HashMap<>();
+ private final Set<TopicPartition> restoredPartitions = new HashSet<>();
+ private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();
+
AssignedStreamsTasks(final LogContext logContext) {
super(logContext, "stream task");
}
@@ -36,6 +46,60 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return restoringByPartition.get(partition);
}
+ void updateRestored(final Collection<TopicPartition> restored) {
+ if (restored.isEmpty()) {
+ return;
+ }
+ log.trace("Stream task changelog partitions that have completed restoring so far: {}", restored);
+ restoredPartitions.addAll(restored);
+ for (final Iterator<Map.Entry<TaskId, StreamTask>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
+ final Map.Entry<TaskId, StreamTask> entry = it.next();
+ final StreamTask task = entry.getValue();
+ if (restoredPartitions.containsAll(task.changelogPartitions())) {
+ transitionToRunning(task);
+ it.remove();
+ log.trace("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state",
+ task.id(),
+ task.changelogPartitions());
+ } else {
+ if (log.isTraceEnabled()) {
+ final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
+ outstandingPartitions.removeAll(restoredPartitions);
+ log.trace("Stream task {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}",
+ task.id(),
+ outstandingPartitions);
+ }
+ }
+ }
+ if (allTasksRunning()) {
+ restoredPartitions.clear();
+ }
+ }
+
+ void addToRestoring(final StreamTask task) {
+ restoring.put(task.id(), task);
+ for (final TopicPartition topicPartition : task.partitions()) {
+ restoringByPartition.put(topicPartition, task);
+ }
+ for (final TopicPartition topicPartition : task.changelogPartitions()) {
+ restoringByPartition.put(topicPartition, task);
+ }
+ }
+
+ @Override
+ boolean allTasksRunning() {
+ return super.allTasksRunning() && restoring.isEmpty();
+ }
+
+ RuntimeException suspend() {
+ final AtomicReference<RuntimeException> firstException = new AtomicReference<>(super.suspend());
+ log.trace("Close restoring stream task {}", restoring.keySet());
+ firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
+ restoring.clear();
+ restoringByPartition.clear();
+ return firstException.get();
+ }
+
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
@@ -154,4 +218,38 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return punctuated;
}
+ public String toString(final String indent) {
+ final StringBuilder builder = new StringBuilder();
+ builder.append(super.toString(indent));
+ describe(builder, restoring.values(), indent, "Restoring:");
+ return builder.toString();
+ }
+
+ @Override
+ List<StreamTask> allTasks() {
+ final List<StreamTask> tasks = super.allTasks();
+ tasks.addAll(restoring.values());
+ return tasks;
+ }
+
+ @Override
+ Set<TaskId> allAssignedTaskIds() {
+ final Set<TaskId> taskIds = super.allAssignedTaskIds();
+ taskIds.addAll(restoring.keySet());
+ return taskIds;
+ }
+
+ void clear() {
+ super.clear();
+ restoring.clear();
+ restoringByPartition.clear();
+ restoredPartitions.clear();
+ }
+
+ // for testing only
+
+ Collection<StreamTask> restoringTasks() {
+ return Collections.unmodifiableCollection(restoring.values());
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 3cc396d..a33ecdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -41,14 +41,11 @@ abstract class AssignedTasks<T extends Task> {
private final String taskTypeName;
private final Map<TaskId, T> created = new HashMap<>();
private final Map<TaskId, T> suspended = new HashMap<>();
- private final Map<TaskId, T> restoring = new HashMap<>();
- private final Set<TopicPartition> restoredPartitions = new HashSet<>();
private final Set<TaskId> previousActiveTasks = new HashSet<>();
// IQ may access this map.
final Map<TaskId, T> running = new ConcurrentHashMap<>();
private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
- final Map<TopicPartition, T> restoringByPartition = new HashMap<>();
AssignedTasks(final LogContext logContext,
final String taskTypeName) {
@@ -74,7 +71,7 @@ abstract class AssignedTasks<T extends Task> {
try {
if (!entry.getValue().initializeStateStores()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
- addToRestoring(entry.getValue());
+ ((AssignedStreamsTasks) this).addToRestoring((StreamTask) entry.getValue());
} else {
transitionToRunning(entry.getValue());
}
@@ -86,42 +83,9 @@ abstract class AssignedTasks<T extends Task> {
}
}
- void updateRestored(final Collection<TopicPartition> restored) {
- if (restored.isEmpty()) {
- return;
- }
- log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
- restoredPartitions.addAll(restored);
- for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
- final Map.Entry<TaskId, T> entry = it.next();
- final T task = entry.getValue();
- if (restoredPartitions.containsAll(task.changelogPartitions())) {
- transitionToRunning(task);
- it.remove();
- log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state",
- taskTypeName,
- task.id(),
- task.changelogPartitions());
- } else {
- if (log.isTraceEnabled()) {
- final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
- outstandingPartitions.removeAll(restoredPartitions);
- log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}",
- taskTypeName,
- task.id(),
- outstandingPartitions);
- }
- }
- }
- if (allTasksRunning()) {
- restoredPartitions.clear();
- }
- }
-
boolean allTasksRunning() {
return created.isEmpty()
- && suspended.isEmpty()
- && restoring.isEmpty();
+ && suspended.isEmpty();
}
Collection<T> running() {
@@ -132,21 +96,17 @@ abstract class AssignedTasks<T extends Task> {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
firstException.compareAndSet(null, suspendTasks(running.values()));
- log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
- firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
log.trace("Close created {} {}", taskTypeName, created.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
previousActiveTasks.clear();
previousActiveTasks.addAll(running.keySet());
running.clear();
- restoring.clear();
created.clear();
runningByPartition.clear();
- restoringByPartition.clear();
return firstException.get();
}
- private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
+ RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
for (final T task : tasks) {
try {
@@ -234,20 +194,10 @@ abstract class AssignedTasks<T extends Task> {
return false;
}
- private void addToRestoring(final T task) {
- restoring.put(task.id(), task);
- for (final TopicPartition topicPartition : task.partitions()) {
- restoringByPartition.put(topicPartition, task);
- }
- for (final TopicPartition topicPartition : task.changelogPartitions()) {
- restoringByPartition.put(topicPartition, task);
- }
- }
-
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- private void transitionToRunning(final T task) {
+ void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
@@ -280,15 +230,14 @@ abstract class AssignedTasks<T extends Task> {
final StringBuilder builder = new StringBuilder();
describe(builder, running.values(), indent, "Running:");
describe(builder, suspended.values(), indent, "Suspended:");
- describe(builder, restoring.values(), indent, "Restoring:");
describe(builder, created.values(), indent, "New:");
return builder.toString();
}
- private void describe(final StringBuilder builder,
- final Collection<T> tasks,
- final String indent,
- final String name) {
+ void describe(final StringBuilder builder,
+ final Collection<T> tasks,
+ final String indent,
+ final String name) {
builder.append(indent).append(name);
for (final T t : tasks) {
builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -296,35 +245,27 @@ abstract class AssignedTasks<T extends Task> {
builder.append("\n");
}
- private List<T> allTasks() {
+ List<T> allTasks() {
final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
tasks.addAll(suspended.values());
- tasks.addAll(restoring.values());
tasks.addAll(created.values());
return tasks;
}
- Collection<T> restoringTasks() {
- return Collections.unmodifiableCollection(restoring.values());
- }
-
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = new HashSet<>();
taskIds.addAll(running.keySet());
taskIds.addAll(suspended.keySet());
- taskIds.addAll(restoring.keySet());
taskIds.addAll(created.keySet());
return taskIds;
}
void clear() {
runningByPartition.clear();
- restoringByPartition.clear();
running.clear();
created.clear();
suspended.clear();
- restoredPartitions.clear();
}
Set<TaskId> previousTaskIds() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
index 3671b49..36bddcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -19,5 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
public interface RestoringTasks {
+
StreamTask restoringTaskFor(final TopicPartition partition);
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 0c24e2d..db48cb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -89,7 +89,7 @@ import static org.junit.Assert.fail;
public class StandbyTaskTest {
private final TaskId taskId = new TaskId(0, 1);
-
+ private StandbyTask task;
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final String applicationId = "test-application";
@@ -165,13 +165,17 @@ public class StandbyTaskTest {
@After
public void cleanup() throws IOException {
+ if (task != null) {
+ task.close(true, false);
+ task = null;
+ }
Utils.delete(baseDir);
}
@Test
public void testStorePartitions() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initializeStateStores();
assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
}
@@ -180,7 +184,7 @@ public class StandbyTaskTest {
@Test
public void testUpdateNonInitializedStore() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -198,7 +202,7 @@ public class StandbyTaskTest {
@Test
public void testUpdate() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initializeStateStores();
final Set<TopicPartition> partition = Collections.singleton(partition2);
restoreStateConsumer.assign(partition);
@@ -228,7 +232,7 @@ public class StandbyTaskTest {
final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
- final List<TopicPartition> partitions = asList(topicPartition);
+ final List<TopicPartition> partitions = Collections.singletonList(topicPartition);
consumer.assign(partitions);
@@ -244,7 +248,7 @@ public class StandbyTaskTest {
builder.buildAndOptimizeTopology();
- final StandbyTask task = new StandbyTask(
+ task = new StandbyTask(
taskId,
partitions,
internalTopologyBuilder.build(0),
@@ -329,7 +333,7 @@ public class StandbyTaskTest {
final String changelogName = applicationId + "-" + storeName + "-changelog";
final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
- final List<TopicPartition> partitions = asList(topicPartition);
+ final List<TopicPartition> partitions = Collections.singletonList(topicPartition);
final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
@@ -342,7 +346,7 @@ public class StandbyTaskTest {
consumer.assign(partitions);
- final StandbyTask task = new StandbyTask(
+ task = new StandbyTask(
taskId,
partitions,
internalTopologyBuilder.build(0),
@@ -393,10 +397,10 @@ public class StandbyTaskTest {
@Test
public void shouldRestoreToKTable() throws IOException {
- consumer.assign(asList(globalTopicPartition));
+ consumer.assign(Collections.singletonList(globalTopicPartition));
consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(0L))));
- final StandbyTask task = new StandbyTask(
+ task = new StandbyTask(
taskId,
ktablePartitions,
ktableTopology,
@@ -495,7 +499,7 @@ public class StandbyTaskTest {
final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
- final StandbyTask standbyTask = new StandbyTask(
+ task = new StandbyTask(
taskId,
emptySet(),
topology,
@@ -506,27 +510,27 @@ public class StandbyTaskTest {
stateDirectory
);
- standbyTask.initializeStateStores();
+ task.initializeStateStores();
- assertTrue(standbyTask.hasStateStores());
+ assertTrue(task.hasStateStores());
}
@Test
public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
- consumer.assign(asList(globalTopicPartition));
+ consumer.assign(Collections.singletonList(globalTopicPartition));
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
consumer.commitSync(committedOffsets);
restoreStateConsumer.updatePartitions(
globalStoreName,
- asList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+ Collections.singletonList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
);
final TaskId taskId = new TaskId(0, 0);
final MockTime time = new MockTime();
final StreamsConfig config = createConfig(baseDir);
- final StandbyTask task = new StandbyTask(
+ task = new StandbyTask(
taskId,
ktablePartitions,
ktableTopology,
@@ -560,19 +564,19 @@ public class StandbyTaskTest {
@Test
public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
- consumer.assign(asList(globalTopicPartition));
+ consumer.assign(Collections.singletonList(globalTopicPartition));
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
consumer.commitSync(committedOffsets);
restoreStateConsumer.updatePartitions(
globalStoreName,
- asList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+ Collections.singletonList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
);
final StreamsConfig config = createConfig(baseDir);
final AtomicBoolean closedStateManager = new AtomicBoolean(false);
- final StandbyTask task = new StandbyTask(
+ task = new StandbyTask(
taskId,
ktablePartitions,
ktableTopology,
@@ -598,6 +602,7 @@ public class StandbyTaskTest {
fail("should have thrown exception");
} catch (final Exception e) {
// expected
+ task = null;
}
assertTrue(closedStateManager.get());
}