You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/23 20:32:46 UTC

[kafka] branch trunk updated: MINOR: Refactor code for restoring tasks (#5768)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0ed389  MINOR: Refactor code for restoring tasks (#5768)
d0ed389 is described below

commit d0ed3894d6c586a580504c5141b3ae7de18f3518
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Nov 23 12:32:37 2018 -0800

    MINOR: Refactor code for restoring tasks (#5768)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../processor/internals/AssignedStreamsTasks.java  | 98 ++++++++++++++++++++++
 .../streams/processor/internals/AssignedTasks.java | 77 ++---------------
 .../processor/internals/RestoringTasks.java        |  2 +
 .../processor/internals/StandbyTaskTest.java       | 43 +++++-----
 4 files changed, 133 insertions(+), 87 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 1eb3ab9..697482c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -22,11 +22,21 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
+    private final Map<TaskId, StreamTask> restoring = new HashMap<>();
+    private final Set<TopicPartition> restoredPartitions = new HashSet<>();
+    private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();
+
     AssignedStreamsTasks(final LogContext logContext) {
         super(logContext, "stream task");
     }
@@ -36,6 +46,60 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
         return restoringByPartition.get(partition);
     }
 
+    void updateRestored(final Collection<TopicPartition> restored) {
+        if (restored.isEmpty()) {
+            return;
+        }
+        log.trace("Stream task changelog partitions that have completed restoring so far: {}", restored);
+        restoredPartitions.addAll(restored);
+        for (final Iterator<Map.Entry<TaskId, StreamTask>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, StreamTask> entry = it.next();
+            final StreamTask task = entry.getValue();
+            if (restoredPartitions.containsAll(task.changelogPartitions())) {
+                transitionToRunning(task);
+                it.remove();
+                log.trace("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state",
+                    task.id(),
+                    task.changelogPartitions());
+            } else {
+                if (log.isTraceEnabled()) {
+                    final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
+                    outstandingPartitions.removeAll(restoredPartitions);
+                    log.trace("Stream task {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}",
+                        task.id(),
+                        outstandingPartitions);
+                }
+            }
+        }
+        if (allTasksRunning()) {
+            restoredPartitions.clear();
+        }
+    }
+
+    void addToRestoring(final StreamTask task) {
+        restoring.put(task.id(), task);
+        for (final TopicPartition topicPartition : task.partitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+    }
+
+    @Override
+    boolean allTasksRunning() {
+        return super.allTasksRunning() && restoring.isEmpty();
+    }
+
+    RuntimeException suspend() {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(super.suspend());
+        log.trace("Close restoring stream task {}", restoring.keySet());
+        firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
+        restoring.clear();
+        restoringByPartition.clear();
+        return firstException.get();
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -154,4 +218,38 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
         return punctuated;
     }
 
+    public String toString(final String indent) {
+        final StringBuilder builder = new StringBuilder();
+        builder.append(super.toString(indent));
+        describe(builder, restoring.values(), indent, "Restoring:");
+        return builder.toString();
+    }
+
+    @Override
+    List<StreamTask> allTasks() {
+        final List<StreamTask> tasks = super.allTasks();
+        tasks.addAll(restoring.values());
+        return tasks;
+    }
+
+    @Override
+    Set<TaskId> allAssignedTaskIds() {
+        final Set<TaskId> taskIds = super.allAssignedTaskIds();
+        taskIds.addAll(restoring.keySet());
+        return taskIds;
+    }
+
+    void clear() {
+        super.clear();
+        restoring.clear();
+        restoringByPartition.clear();
+        restoredPartitions.clear();
+    }
+
+    // for testing only
+
+    Collection<StreamTask> restoringTasks() {
+        return Collections.unmodifiableCollection(restoring.values());
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 3cc396d..a33ecdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -41,14 +41,11 @@ abstract class AssignedTasks<T extends Task> {
     private final String taskTypeName;
     private final Map<TaskId, T> created = new HashMap<>();
     private final Map<TaskId, T> suspended = new HashMap<>();
-    private final Map<TaskId, T> restoring = new HashMap<>();
-    private final Set<TopicPartition> restoredPartitions = new HashSet<>();
     private final Set<TaskId> previousActiveTasks = new HashSet<>();
 
     // IQ may access this map.
     final Map<TaskId, T> running = new ConcurrentHashMap<>();
     private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
-    final Map<TopicPartition, T> restoringByPartition = new HashMap<>();
 
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
@@ -74,7 +71,7 @@ abstract class AssignedTasks<T extends Task> {
             try {
                 if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
-                    addToRestoring(entry.getValue());
+                    ((AssignedStreamsTasks) this).addToRestoring((StreamTask) entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue());
                 }
@@ -86,42 +83,9 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
-    void updateRestored(final Collection<TopicPartition> restored) {
-        if (restored.isEmpty()) {
-            return;
-        }
-        log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
-        restoredPartitions.addAll(restored);
-        for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
-            final Map.Entry<TaskId, T> entry = it.next();
-            final T task = entry.getValue();
-            if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task);
-                it.remove();
-                log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state",
-                        taskTypeName,
-                        task.id(),
-                        task.changelogPartitions());
-            } else {
-                if (log.isTraceEnabled()) {
-                    final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
-                    outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}",
-                              taskTypeName,
-                              task.id(),
-                              outstandingPartitions);
-                }
-            }
-        }
-        if (allTasksRunning()) {
-            restoredPartitions.clear();
-        }
-    }
-
     boolean allTasksRunning() {
         return created.isEmpty()
-                && suspended.isEmpty()
-                && restoring.isEmpty();
+                && suspended.isEmpty();
     }
 
     Collection<T> running() {
@@ -132,21 +96,17 @@ abstract class AssignedTasks<T extends Task> {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
         firstException.compareAndSet(null, suspendTasks(running.values()));
-        log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
-        firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
         log.trace("Close created {} {}", taskTypeName, created.keySet());
         firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
         previousActiveTasks.clear();
         previousActiveTasks.addAll(running.keySet());
         running.clear();
-        restoring.clear();
         created.clear();
         runningByPartition.clear();
-        restoringByPartition.clear();
         return firstException.get();
     }
 
-    private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
+    RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
         RuntimeException exception = null;
         for (final T task : tasks) {
             try {
@@ -234,20 +194,10 @@ abstract class AssignedTasks<T extends Task> {
         return false;
     }
 
-    private void addToRestoring(final T task) {
-        restoring.put(task.id(), task);
-        for (final TopicPartition topicPartition : task.partitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-        for (final TopicPartition topicPartition : task.changelogPartitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void transitionToRunning(final T task) {
+    void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
@@ -280,15 +230,14 @@ abstract class AssignedTasks<T extends Task> {
         final StringBuilder builder = new StringBuilder();
         describe(builder, running.values(), indent, "Running:");
         describe(builder, suspended.values(), indent, "Suspended:");
-        describe(builder, restoring.values(), indent, "Restoring:");
         describe(builder, created.values(), indent, "New:");
         return builder.toString();
     }
 
-    private void describe(final StringBuilder builder,
-                          final Collection<T> tasks,
-                          final String indent,
-                          final String name) {
+    void describe(final StringBuilder builder,
+                  final Collection<T> tasks,
+                  final String indent,
+                  final String name) {
         builder.append(indent).append(name);
         for (final T t : tasks) {
             builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -296,35 +245,27 @@ abstract class AssignedTasks<T extends Task> {
         builder.append("\n");
     }
 
-    private List<T> allTasks() {
+    List<T> allTasks() {
         final List<T> tasks = new ArrayList<>();
         tasks.addAll(running.values());
         tasks.addAll(suspended.values());
-        tasks.addAll(restoring.values());
         tasks.addAll(created.values());
         return tasks;
     }
 
-    Collection<T> restoringTasks() {
-        return Collections.unmodifiableCollection(restoring.values());
-    }
-
     Set<TaskId> allAssignedTaskIds() {
         final Set<TaskId> taskIds = new HashSet<>();
         taskIds.addAll(running.keySet());
         taskIds.addAll(suspended.keySet());
-        taskIds.addAll(restoring.keySet());
         taskIds.addAll(created.keySet());
         return taskIds;
     }
 
     void clear() {
         runningByPartition.clear();
-        restoringByPartition.clear();
         running.clear();
         created.clear();
         suspended.clear();
-        restoredPartitions.clear();
     }
 
     Set<TaskId> previousTaskIds() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
index 3671b49..36bddcc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -19,5 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 
 public interface RestoringTasks {
+
     StreamTask restoringTaskFor(final TopicPartition partition);
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 0c24e2d..db48cb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -89,7 +89,7 @@ import static org.junit.Assert.fail;
 public class StandbyTaskTest {
 
     private final TaskId taskId = new TaskId(0, 1);
-
+    private StandbyTask task;
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
 
     private final String applicationId = "test-application";
@@ -165,13 +165,17 @@ public class StandbyTaskTest {
 
     @After
     public void cleanup() throws IOException {
+        if (task != null) {
+            task.close(true, false);
+            task = null;
+        }
         Utils.delete(baseDir);
     }
 
     @Test
     public void testStorePartitions() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
@@ -180,7 +184,7 @@ public class StandbyTaskTest {
     @Test
     public void testUpdateNonInitializedStore() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
@@ -198,7 +202,7 @@ public class StandbyTaskTest {
     @Test
     public void testUpdate() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        final StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initializeStateStores();
         final Set<TopicPartition> partition = Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
@@ -228,7 +232,7 @@ public class StandbyTaskTest {
 
         final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
 
-        final List<TopicPartition> partitions = asList(topicPartition);
+        final List<TopicPartition> partitions = Collections.singletonList(topicPartition);
 
         consumer.assign(partitions);
 
@@ -244,7 +248,7 @@ public class StandbyTaskTest {
 
         builder.buildAndOptimizeTopology();
 
-        final StandbyTask task = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             partitions,
             internalTopologyBuilder.build(0),
@@ -329,7 +333,7 @@ public class StandbyTaskTest {
         final String changelogName = applicationId + "-" + storeName + "-changelog";
 
         final TopicPartition topicPartition = new TopicPartition(changelogName, 1);
-        final List<TopicPartition> partitions = asList(topicPartition);
+        final List<TopicPartition> partitions = Collections.singletonList(topicPartition);
 
         final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
 
@@ -342,7 +346,7 @@ public class StandbyTaskTest {
 
         consumer.assign(partitions);
 
-        final StandbyTask task = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             partitions,
             internalTopologyBuilder.build(0),
@@ -393,10 +397,10 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldRestoreToKTable() throws IOException {
-        consumer.assign(asList(globalTopicPartition));
+        consumer.assign(Collections.singletonList(globalTopicPartition));
         consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(0L))));
 
-        final StandbyTask task = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             ktablePartitions,
             ktableTopology,
@@ -495,7 +499,7 @@ public class StandbyTaskTest {
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
         final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
 
-        final StandbyTask standbyTask = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             emptySet(),
             topology,
@@ -506,27 +510,27 @@ public class StandbyTaskTest {
             stateDirectory
         );
 
-        standbyTask.initializeStateStores();
+        task.initializeStateStores();
 
-        assertTrue(standbyTask.hasStateStores());
+        assertTrue(task.hasStateStores());
     }
 
     @Test
     public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
-        consumer.assign(asList(globalTopicPartition));
+        consumer.assign(Collections.singletonList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
         restoreStateConsumer.updatePartitions(
             globalStoreName,
-            asList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+            Collections.singletonList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
         );
 
         final TaskId taskId = new TaskId(0, 0);
         final MockTime time = new MockTime();
         final StreamsConfig config = createConfig(baseDir);
-        final StandbyTask task = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             ktablePartitions,
             ktableTopology,
@@ -560,19 +564,19 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
-        consumer.assign(asList(globalTopicPartition));
+        consumer.assign(Collections.singletonList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
         restoreStateConsumer.updatePartitions(
             globalStoreName,
-            asList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
+            Collections.singletonList(new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]))
         );
 
         final StreamsConfig config = createConfig(baseDir);
         final AtomicBoolean closedStateManager = new AtomicBoolean(false);
-        final StandbyTask task = new StandbyTask(
+        task = new StandbyTask(
             taskId,
             ktablePartitions,
             ktableTopology,
@@ -598,6 +602,7 @@ public class StandbyTaskTest {
             fail("should have thrown exception");
         } catch (final Exception e) {
             // expected
+            task = null;
         }
         assertTrue(closedStateManager.get());
     }