You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/06 18:26:55 UTC

[1/2] kafka git commit: KAFKA-6115: TaskManager should be type aware

Repository: kafka
Updated Branches:
  refs/heads/trunk 0c895706e -> d637ad0da


http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 1368051..9271ca6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -68,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
     private File stateDir;
     private final String topicName = "topic";
     private StreamThread threadMock;
-    private Map<TaskId, Task> tasks;
+    private Map<TaskId, StreamTask> tasks;
 
     @SuppressWarnings("deprecation")
     @Before


[2/2] kafka git commit: KAFKA-6115: TaskManager should be type aware

Posted by gu...@apache.org.
KAFKA-6115: TaskManager should be type aware

 - remove type specific methods from Task interface
 - add generics to preserve task type
 - add sub classes for different task types

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4129 from mjsax/kafka-6115-taskManager-should-be-type-aware


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d637ad0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d637ad0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d637ad0d

Branch: refs/heads/trunk
Commit: d637ad0dafb727bb73c63f1b187d9f83abcd4ec1
Parents: 0c89570
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Nov 6 10:26:52 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 6 10:26:52 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |   8 +-
 .../internals/AssignedStandbyTasks.java         |  27 ++
 .../internals/AssignedStreamsTasks.java         | 128 +++++
 .../processor/internals/AssignedTasks.java      | 174 ++-----
 .../processor/internals/RestoringTasks.java     |   2 +-
 .../processor/internals/StandbyTask.java        |  25 -
 .../streams/processor/internals/StreamTask.java |  11 -
 .../processor/internals/StreamThread.java       |  36 +-
 .../kafka/streams/processor/internals/Task.java |  59 +--
 .../streams/processor/internals/TaskAction.java |   4 +-
 .../processor/internals/TaskManager.java        |  32 +-
 .../processor/internals/AbstractTaskTest.java   |  37 --
 .../internals/AssignedStreamsTasksTest.java     | 461 +++++++++++++++++++
 .../processor/internals/AssignedTasksTest.java  | 461 -------------------
 .../internals/StoreChangelogReaderTest.java     |   2 +-
 .../processor/internals/TaskManagerTest.java    |  22 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 17 files changed, 733 insertions(+), 759 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 52465ed..b0ae23c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -102,22 +102,22 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public final String applicationId() {
+    public String applicationId() {
         return applicationId;
     }
 
     @Override
-    public final Set<TopicPartition> partitions() {
+    public Set<TopicPartition> partitions() {
         return partitions;
     }
 
     @Override
-    public final ProcessorTopology topology() {
+    public ProcessorTopology topology() {
         return topology;
     }
 
     @Override
-    public final ProcessorContext context() {
+    public ProcessorContext context() {
         return processorContext;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
new file mode 100644
index 0000000..a99e451
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
+
+    AssignedStandbyTasks(final LogContext logContext) {
+        super(logContext, "standby task");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
new file mode 100644
index 0000000..5ef404f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+
+class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
+    private final Logger log;
+    private final TaskAction<StreamTask> maybeCommitAction;
+    private int committed = 0;
+
+    AssignedStreamsTasks(final LogContext logContext) {
+        super(logContext, "stream task");
+
+        this.log = logContext.logger(getClass());
+
+        maybeCommitAction = new TaskAction<StreamTask>() {
+            @Override
+            public String name() {
+                return "maybeCommit";
+            }
+
+            @Override
+            public void apply(final StreamTask task) {
+                if (task.commitNeeded()) {
+                    committed++;
+                    task.commit();
+                    log.debug("Committed active task {} per user request in", task.id());
+                }
+            }
+        };
+    }
+
+    @Override
+    public StreamTask restoringTaskFor(final TopicPartition partition) {
+        return restoringByPartition.get(partition);
+    }
+
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
+    int maybeCommit() {
+        committed = 0;
+        applyToRunningTasks(maybeCommitAction);
+        return committed;
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    int process() {
+        int processed = 0;
+        final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
+        while (it.hasNext()) {
+            final StreamTask task = it.next().getValue();
+            try {
+                if (task.process()) {
+                    processed++;
+                }
+            } catch (final TaskMigratedException e) {
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
+                it.remove();
+                throw e;
+            } catch (final RuntimeException e) {
+                log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+                throw e;
+            }
+        }
+        return processed;
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    int punctuate() {
+        int punctuated = 0;
+        final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
+        while (it.hasNext()) {
+            final StreamTask task = it.next().getValue();
+            try {
+                if (task.maybePunctuateStreamTime()) {
+                    punctuated++;
+                }
+                if (task.maybePunctuateSystemTime()) {
+                    punctuated++;
+                }
+            } catch (final TaskMigratedException e) {
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
+                it.remove();
+                throw e;
+            } catch (final KafkaException e) {
+                log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
+                throw e;
+            }
+        }
+        return punctuated;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a3c9c2f..b90ec10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LockException;
@@ -37,22 +36,19 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-class AssignedTasks implements RestoringTasks {
+abstract class AssignedTasks<T extends Task> {
     private final Logger log;
     private final String taskTypeName;
-    private final TaskAction maybeCommitAction;
-    private final TaskAction commitAction;
-    private Map<TaskId, Task> created = new HashMap<>();
-    private Map<TaskId, Task> suspended = new HashMap<>();
-    private Map<TaskId, Task> restoring = new HashMap<>();
+    private final TaskAction<T> commitAction;
+    private Map<TaskId, T> created = new HashMap<>();
+    private Map<TaskId, T> suspended = new HashMap<>();
+    private Map<TaskId, T> restoring = new HashMap<>();
     private Set<TopicPartition> restoredPartitions = new HashSet<>();
     private Set<TaskId> previousActiveTasks = new HashSet<>();
     // IQ may access this map.
-    private Map<TaskId, Task> running = new ConcurrentHashMap<>();
-    private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
-    private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
-    private int committed = 0;
-
+    Map<TaskId, T> running = new ConcurrentHashMap<>();
+    private Map<TopicPartition, T> runningByPartition = new HashMap<>();
+    Map<TopicPartition, T> restoringByPartition = new HashMap<>();
 
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
@@ -60,36 +56,20 @@ class AssignedTasks implements RestoringTasks {
 
         this.log = logContext.logger(getClass());
 
-        maybeCommitAction = new TaskAction() {
-            @Override
-            public String name() {
-                return "maybeCommit";
-            }
-
-            @Override
-            public void apply(final Task task) {
-                if (task.commitNeeded()) {
-                    committed++;
-                    task.commit();
-                    log.debug("Committed active task {} per user request in", task.id());
-                }
-            }
-        };
-
-        commitAction = new TaskAction() {
+        commitAction = new TaskAction<T>() {
             @Override
             public String name() {
                 return "commit";
             }
 
             @Override
-            public void apply(final Task task) {
+            public void apply(final T task) {
                 task.commit();
             }
         };
     }
 
-    void addNewTask(final Task task) {
+    void addNewTask(final T task) {
         created.put(task.id(), task);
     }
 
@@ -98,7 +78,7 @@ class AssignedTasks implements RestoringTasks {
             return Collections.emptySet();
         }
         final Set<TopicPartition> partitions = new HashSet<>();
-        for (final Map.Entry<TaskId, Task> entry : created.entrySet()) {
+        for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
             if (entry.getValue().hasStateStores()) {
                 partitions.addAll(entry.getValue().partitions());
             }
@@ -116,8 +96,8 @@ class AssignedTasks implements RestoringTasks {
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());
         }
-        for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) {
-            final Map.Entry<TaskId, Task> entry = it.next();
+        for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, T> entry = it.next();
             try {
                 if (!entry.getValue().initialize()) {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
@@ -141,9 +121,9 @@ class AssignedTasks implements RestoringTasks {
         log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
         final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
-        for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
-            final Map.Entry<TaskId, Task> entry = it.next();
-            final Task task = entry.getValue();
+        for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, T> entry = it.next();
+            final T task = entry.getValue();
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
                 transitionToRunning(task, resume);
                 it.remove();
@@ -174,7 +154,7 @@ class AssignedTasks implements RestoringTasks {
                 && restoring.isEmpty();
     }
 
-    Collection<Task> running() {
+    Collection<T> running() {
         return running.values();
     }
 
@@ -196,9 +176,9 @@ class AssignedTasks implements RestoringTasks {
         return firstException.get();
     }
 
-    private RuntimeException closeNonRunningTasks(final Collection<Task> tasks) {
+    private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
         RuntimeException exception = null;
-        for (final Task task : tasks) {
+        for (final T task : tasks) {
             try {
                 task.close(false, false);
             } catch (final RuntimeException e) {
@@ -211,10 +191,10 @@ class AssignedTasks implements RestoringTasks {
         return exception;
     }
 
-    private RuntimeException suspendTasks(final Collection<Task> tasks) {
+    private RuntimeException suspendTasks(final Collection<T> tasks) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
-            final Task task = it.next();
+        for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+            final T task = it.next();
             try {
                 task.suspend();
                 suspended.put(task.id(), task);
@@ -235,7 +215,7 @@ class AssignedTasks implements RestoringTasks {
         return firstException.get();
     }
 
-    private RuntimeException closeZombieTask(final Task task) {
+    RuntimeException closeZombieTask(final T task) {
         log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
         try {
             task.close(false, true);
@@ -255,7 +235,7 @@ class AssignedTasks implements RestoringTasks {
      */
     boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
         if (suspended.containsKey(taskId)) {
-            final Task task = suspended.get(taskId);
+            final T task = suspended.get(taskId);
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
@@ -279,7 +259,7 @@ class AssignedTasks implements RestoringTasks {
         return false;
     }
 
-    private void addToRestoring(final Task task) {
+    private void addToRestoring(final T task) {
         restoring.put(task.id(), task);
         for (TopicPartition topicPartition : task.partitions()) {
             restoringByPartition.put(topicPartition, task);
@@ -289,7 +269,7 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) {
+    private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         for (TopicPartition topicPartition : task.partitions()) {
@@ -303,12 +283,7 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    @Override
-    public Task restoringTaskFor(final TopicPartition partition) {
-        return restoringByPartition.get(partition);
-    }
-
-    Task runningTaskFor(final TopicPartition partition) {
+    T runningTaskFor(final TopicPartition partition) {
         return runningByPartition.get(partition);
     }
 
@@ -316,7 +291,7 @@ class AssignedTasks implements RestoringTasks {
         return running.keySet();
     }
 
-    Map<TaskId, Task> runningTaskMap() {
+    Map<TaskId, T> runningTaskMap() {
         return Collections.unmodifiableMap(running);
     }
 
@@ -330,18 +305,18 @@ class AssignedTasks implements RestoringTasks {
     }
 
     private void describe(final StringBuilder builder,
-                          final Collection<Task> tasks,
+                          final Collection<T> tasks,
                           final String indent,
                           final String name) {
         builder.append(indent).append(name);
-        for (final Task t : tasks) {
+        for (final T t : tasks) {
             builder.append(indent).append(t.toString(indent + "\t\t"));
         }
         builder.append("\n");
     }
 
-    private List<Task> allTasks() {
-        final List<Task> tasks = new ArrayList<>();
+    private List<T> allTasks() {
+        final List<T> tasks = new ArrayList<>();
         tasks.addAll(running.values());
         tasks.addAll(suspended.values());
         tasks.addAll(restoring.values());
@@ -349,7 +324,7 @@ class AssignedTasks implements RestoringTasks {
         return tasks;
     }
 
-    Collection<Task> restoringTasks() {
+    Collection<T> restoringTasks() {
         return Collections.unmodifiableCollection(restoring.values());
     }
 
@@ -384,78 +359,11 @@ class AssignedTasks implements RestoringTasks {
         return running.size();
     }
 
-    /**
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
-    int maybeCommit() {
-        committed = 0;
-        applyToRunningTasks(maybeCommitAction);
-        return committed;
-    }
-
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
-    int process() {
-        int processed = 0;
-        final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
-        while (it.hasNext()) {
-            final Task task = it.next().getValue();
-            try {
-                if (task.process()) {
-                    processed++;
-                }
-            } catch (final TaskMigratedException e) {
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
-                throw e;
-            } catch (final RuntimeException e) {
-                log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
-                throw e;
-            }
-        }
-        return processed;
-    }
-
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
-    int punctuate() {
-        int punctuated = 0;
-        final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
-        while (it.hasNext()) {
-            final Task task = it.next().getValue();
-            try {
-                if (task.maybePunctuateStreamTime()) {
-                    punctuated++;
-                }
-                if (task.maybePunctuateSystemTime()) {
-                    punctuated++;
-                }
-            } catch (final TaskMigratedException e) {
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
-                throw e;
-            } catch (final KafkaException e) {
-                log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
-                throw e;
-            }
-        }
-        return punctuated;
-    }
-
-    private void applyToRunningTasks(final TaskAction action) {
+    void applyToRunningTasks(final TaskAction<T> action) {
         RuntimeException firstException = null;
 
-        for (Iterator<Task> it = running().iterator(); it.hasNext(); ) {
-            final Task task = it.next();
+        for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
+            final T task = it.next();
             try {
                 action.apply(task);
             } catch (final TaskMigratedException e) {
@@ -485,9 +393,9 @@ class AssignedTasks implements RestoringTasks {
     }
 
     void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment) {
-        final Iterator<Task> standByTaskIterator = suspended.values().iterator();
+        final Iterator<T> standByTaskIterator = suspended.values().iterator();
         while (standByTaskIterator.hasNext()) {
-            final Task suspendedTask = standByTaskIterator.next();
+            final T suspendedTask = standByTaskIterator.next();
             if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
                 log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
                 try {
@@ -503,7 +411,7 @@ class AssignedTasks implements RestoringTasks {
 
     void close(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        for (final Task task : allTasks()) {
+        for (final T task : allTasks()) {
             try {
                 task.close(clean, false);
             } catch (final TaskMigratedException e) {
@@ -531,7 +439,7 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    private boolean closeUnclean(final Task task) {
+    private boolean closeUnclean(final T task) {
         log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
         try {
             task.close(false, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
index 6ed28fd..3671b49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -19,5 +19,5 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 
 public interface RestoringTasks {
-    Task restoringTaskFor(final TopicPartition partition);
+    StreamTask restoringTaskFor(final TopicPartition partition);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 98ec810..fbbb357 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -147,11 +147,6 @@ public class StandbyTask extends AbstractTask {
         close(clean, isZombie);
     }
 
-    @Override
-    public boolean commitNeeded() {
-        return false;
-    }
-
     /**
      * Updates a state store using records from one change log partition
      *
@@ -163,28 +158,8 @@ public class StandbyTask extends AbstractTask {
         return stateMgr.updateStandbyStates(partition, records);
     }
 
-    @Override
-    public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("add records not supported by StandbyTasks");
-    }
-
     public Map<TopicPartition, Long> checkpointedOffsets() {
         return checkpointedOffsets;
     }
 
-    @Override
-    public boolean maybePunctuateStreamTime() {
-        throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
-    }
-
-    @Override
-    public boolean maybePunctuateSystemTime() {
-        throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
-    }
-
-    @Override
-    public boolean process() {
-        throw new UnsupportedOperationException("process not supported by StandbyTasks");
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 06f45ed..4b78e27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -42,7 +42,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static java.lang.String.format;
@@ -493,11 +492,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
-    @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
-        throw new UnsupportedOperationException("checkpointedOffsets is not supported by StreamTasks");
-    }
-
     /**
      * <pre>
      * - {@link #suspend(boolean) suspend(clean)}
@@ -619,11 +613,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
     }
 
-    @Override
-    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
-        throw new UnsupportedOperationException("update is not implemented");
-    }
-
     /**
      * Request committing the current task's state
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72b3a52..1982afb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -308,7 +308,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
-    static abstract class AbstractTaskCreator {
+    static abstract class AbstractTaskCreator<T extends Task> {
         final String applicationId;
         final InternalTopologyBuilder builder;
         final StreamsConfig config;
@@ -342,12 +342,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         /**
          * @throws TaskMigratedException if the task producer got fenced (EOS only)
          */
-        Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
-            final List<Task> createdTasks = new ArrayList<>();
+        Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
+            final List<T> createdTasks = new ArrayList<>();
             for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
                 final TaskId taskId = newTaskAndPartitions.getKey();
                 final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
-                Task task = createTask(consumer, taskId, partitions);
+                T task = createTask(consumer, taskId, partitions);
                 if (task != null) {
                     log.trace("Created task {} with assigned partitions {}", taskId, partitions);
                     createdTasks.add(task);
@@ -357,12 +357,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             return createdTasks;
         }
 
-        abstract Task createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
+        abstract T createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
 
         public void close() {}
     }
 
-    static class TaskCreator extends AbstractTaskCreator {
+    static class TaskCreator extends AbstractTaskCreator<StreamTask> {
         private final ThreadCache cache;
         private final KafkaClientSupplier clientSupplier;
         private final String threadClientId;
@@ -441,7 +441,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
-    static class StandbyTaskCreator extends AbstractTaskCreator {
+    static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
         StandbyTaskCreator(final InternalTopologyBuilder builder,
                            final StreamsConfig config,
                            final StreamsMetrics streamsMetrics,
@@ -706,12 +706,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                                         restoreConsumer,
                                                         activeTaskCreator,
                                                         standbyTaskCreator,
-                                                        new AssignedTasks(logContext,
-                                                                          "stream task"
-                                                        ),
-                                                        new AssignedTasks(logContext,
-                                                                          "standby task"
-                                                        ));
+                                                        new AssignedStreamsTasks(logContext),
+                                                        new AssignedStandbyTasks(logContext));
 
         return new StreamThread(builder,
                                 clientId,
@@ -916,7 +912,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             int numAddedRecords = 0;
 
             for (final TopicPartition partition : records.partitions()) {
-                final Task task = taskManager.activeTask(partition);
+                final StreamTask task = taskManager.activeTask(partition);
                 numAddedRecords += task.addRecords(partition, records.records(partition));
             }
             streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -1040,7 +1036,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                         final TopicPartition partition = entry.getKey();
                         List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
                         if (remaining != null) {
-                            final Task task = taskManager.standbyTask(partition);
+                            final StandbyTask task = taskManager.standbyTask(partition);
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
@@ -1061,7 +1057,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
             if (!records.isEmpty()) {
                 for (final TopicPartition partition : records.partitions()) {
-                    final Task task = taskManager.standbyTask(partition);
+                    final StandbyTask task = taskManager.standbyTask(partition);
 
                     if (task == null) {
                         throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
@@ -1101,7 +1097,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         setState(State.PENDING_SHUTDOWN);
     }
 
-    public Map<TaskId, Task> tasks() {
+    public Map<TaskId, StreamTask> tasks() {
         return taskManager.activeTasks();
     }
 
@@ -1248,16 +1244,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         return threadMetadata;
     }
 
-    private void updateThreadMetadata(final Map<TaskId, Task> activeTasks, final Map<TaskId, Task> standbyTasks) {
+    private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) {
         final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
         if (activeTasks != null) {
-            for (Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
+            for (Map.Entry<TaskId, StreamTask> task : activeTasks.entrySet()) {
                 activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
             }
         }
         final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
         if (standbyTasks != null) {
-            for (Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
+            for (Map.Entry<TaskId, StandbyTask> task : standbyTasks.entrySet()) {
                 standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 80e5423..f066bff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -24,61 +23,49 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public interface Task {
-    void resume();
+    /**
+     * Initialize the task and return {}true if the task is ready to run, i.e, it has not state stores
+     * @return true if this task has no state stores that may need restoring.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    boolean initialize();
 
     void commit();
 
     void suspend();
 
-    void close(boolean clean, boolean isZombie);
+    void resume();
 
-    TaskId id();
+    void closeSuspended(final boolean clean,
+                        final boolean isZombie,
+                        final RuntimeException e);
 
-    String applicationId();
+    void close(final boolean clean,
+               final boolean isZombie);
 
-    Set<TopicPartition> partitions();
+    StateStore getStore(final String name);
+
+    String applicationId();
 
     ProcessorTopology topology();
 
     ProcessorContext context();
 
-    StateStore getStore(String name);
-
-    void closeSuspended(boolean clean, boolean isZombie, RuntimeException e);
-
-    Map<TopicPartition, Long> checkpointedOffsets();
-
-    boolean process();
-
-    boolean commitNeeded();
-
-    boolean maybePunctuateStreamTime();
-
-    boolean maybePunctuateSystemTime();
-
-    List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> remaining);
-
-    String toString(String indent);
-
-    int addRecords(TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean hasStateStores();
+    TaskId id();
 
-    /**
-     * initialize the task and return true if the task is ready to run, i.e, it has not state stores
-     * @return true if this task has no state stores that may need restoring.
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    boolean initialize();
+    Set<TopicPartition> partitions();
 
     /**
      * @return any changelog partitions associated with this task
      */
     Collection<TopicPartition> changelogPartitions();
+
+    boolean hasStateStores();
+
+    String toString(final String indent);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
index 9112594..da5f325 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-interface TaskAction {
+interface TaskAction<T extends Task> {
     String name();
-    void apply(final Task task);
+    void apply(final T task);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6f4cc51..0238615 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -37,23 +37,23 @@ class TaskManager {
     // activeTasks needs to be concurrent as it can be accessed
     // by QueryableState
     private final Logger log;
-    private final AssignedTasks active;
-    private final AssignedTasks standby;
+    private final AssignedStreamsTasks active;
+    private final AssignedStandbyTasks standby;
     private final ChangelogReader changelogReader;
     private final String logPrefix;
     private final Consumer<byte[], byte[]> restoreConsumer;
-    private final StreamThread.AbstractTaskCreator taskCreator;
-    private final StreamThread.AbstractTaskCreator standbyTaskCreator;
+    private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
+    private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     private ThreadMetadataProvider threadMetadataProvider;
     private Consumer<byte[], byte[]> consumer;
 
     TaskManager(final ChangelogReader changelogReader,
                 final String logPrefix,
                 final Consumer<byte[], byte[]> restoreConsumer,
-                final StreamThread.AbstractTaskCreator taskCreator,
-                final StreamThread.AbstractTaskCreator standbyTaskCreator,
-                final AssignedTasks active,
-                final AssignedTasks standby) {
+                final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
+                final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
+                final AssignedStreamsTasks active,
+                final AssignedStandbyTasks standby) {
         this.changelogReader = changelogReader;
         this.logPrefix = logPrefix;
         this.restoreConsumer = restoreConsumer;
@@ -133,7 +133,7 @@ class TaskManager {
         // -> other thread will call removeSuspendedTasks(); eventually
         log.trace("New active tasks to be created: {}", newTasks);
 
-        for (final Task task : taskCreator.createTasks(consumer, newTasks)) {
+        for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
             active.addNewTask(task);
         }
     }
@@ -166,7 +166,7 @@ class TaskManager {
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
         log.trace("New standby tasks to be created: {}", newStandbyTasks);
 
-        for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
+        for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
             standby.addNewTask(task);
         }
     }
@@ -240,20 +240,20 @@ class TaskManager {
         return standby.previousTaskIds();
     }
 
-    Task activeTask(final TopicPartition partition) {
+    StreamTask activeTask(final TopicPartition partition) {
         return active.runningTaskFor(partition);
     }
 
 
-    Task standbyTask(final TopicPartition partition) {
+    StandbyTask standbyTask(final TopicPartition partition) {
         return standby.runningTaskFor(partition);
     }
 
-    Map<TaskId, Task> activeTasks() {
+    Map<TaskId, StreamTask> activeTasks() {
         return active.runningTaskMap();
     }
 
-    Map<TaskId, Task> standbyTasks() {
+    Map<TaskId, StandbyTask> standbyTasks() {
         return standby.runningTaskMap();
     }
 
@@ -293,9 +293,9 @@ class TaskManager {
     }
 
     private void assignStandbyPartitions() {
-        final Collection<Task> running = standby.running();
+        final Collection<StandbyTask> running = standby.running();
         final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
-        for (final Task standbyTask : running) {
+        for (final StandbyTask standbyTask : running) {
             checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 02aa0a0..efc6f79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -40,7 +39,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.fail;
@@ -142,41 +140,6 @@ public class AbstractTaskTest {
             public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
 
             @Override
-            public Map<TopicPartition, Long> checkpointedOffsets() {
-                return null;
-            }
-
-            @Override
-            public boolean process() {
-                return false;
-            }
-
-            @Override
-            public boolean commitNeeded() {
-                return false;
-            }
-
-            @Override
-            public boolean maybePunctuateStreamTime() {
-                return false;
-            }
-
-            @Override
-            public boolean maybePunctuateSystemTime() {
-                return false;
-            }
-
-            @Override
-            public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
-                return null;
-            }
-
-            @Override
-            public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-                return 0;
-            }
-
-            @Override
             public boolean initialize() {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
new file mode 100644
index 0000000..3d33b0b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AssignedStreamsTasksTest {
+
+    private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
+    private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
+    private final TopicPartition tp1 = new TopicPartition("t1", 0);
+    private final TopicPartition tp2 = new TopicPartition("t2", 0);
+    private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
+    private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
+    private final TaskId taskId1 = new TaskId(0, 0);
+    private final TaskId taskId2 = new TaskId(1, 0);
+    private AssignedStreamsTasks assignedTasks;
+
+    @Before
+    public void before() {
+        assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
+        EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
+        EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
+    }
+
+    @Test
+    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
+        EasyMock.expect(t1.hasStateStores()).andReturn(true);
+        EasyMock.expect(t2.hasStateStores()).andReturn(true);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        EasyMock.replay(t1, t2);
+
+        assignedTasks.addNewTask(t1);
+        assignedTasks.addNewTask(t2);
+
+        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
+        assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
+        EasyMock.verify(t1, t2);
+    }
+
+    @Test
+    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
+        EasyMock.expect(t1.hasStateStores()).andReturn(false);
+        EasyMock.expect(t2.hasStateStores()).andReturn(false);
+        EasyMock.replay(t1, t2);
+
+        assignedTasks.addNewTask(t1);
+        assignedTasks.addNewTask(t2);
+
+        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
+        assertTrue(partitions.isEmpty());
+        EasyMock.verify(t1, t2);
+    }
+
+    @Test
+    public void shouldInitializeNewTasks() {
+        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
+        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        EasyMock.expect(t2.initialize()).andReturn(true);
+        final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
+        EasyMock.expect(t2.partitions()).andReturn(t2partitions);
+        EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t2.hasStateStores()).andReturn(true);
+
+        EasyMock.replay(t1, t2);
+
+        assignedTasks.addNewTask(t1);
+        assignedTasks.addNewTask(t2);
+
+        final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
+
+        Collection<StreamTask> restoring = assignedTasks.restoringTasks();
+        assertThat(restoring.size(), equalTo(1));
+        assertSame(restoring.iterator().next(), t1);
+        assertThat(readyPartitions, equalTo(t2partitions));
+    }
+
+    @Test
+    public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
+        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t2.hasStateStores()).andReturn(false);
+
+        EasyMock.replay(t2);
+
+        assignedTasks.addNewTask(t2);
+        final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
+        assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
+    }
+
+    @Test
+    public void shouldTransitionFullyRestoredTasksToRunning() {
+        final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
+        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
+        EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
+        Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
+        assertThat(partitions, equalTo(task1Partitions));
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+    }
+
+    @Test
+    public void shouldSuspendRunningTasks() {
+        mockRunningTaskSuspension();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+
+        assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseRestoringTasks() {
+        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        t1.close(false, false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldClosedUnInitializedTasksOnSuspend() {
+        t1.close(false, false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assignedTasks.addNewTask(t1);
+        assertThat(assignedTasks.suspend(), nullValue());
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldNotSuspendSuspendedTasks() {
+        mockRunningTaskSuspension();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+        assertThat(assignedTasks.suspend(), nullValue());
+        EasyMock.verify(t1);
+    }
+
+
+    @Test
+    public void shouldCloseTaskOnSuspendWhenRuntimeException() {
+        mockTaskInitialization();
+        t1.suspend();
+        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
+        t1.close(false, false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), not(nullValue()));
+        assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
+        mockTaskInitialization();
+        t1.suspend();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+        assertTrue(assignedTasks.previousTaskIds().isEmpty());
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldResumeMatchingSuspendedTasks() {
+        mockRunningTaskSuspension();
+        t1.resume();
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+
+        assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+        mockRunningTaskSuspension();
+        t1.resume();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+
+        try {
+            assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
+
+    private void mockTaskInitialization() {
+        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t1.hasStateStores()).andReturn(false);
+    }
+
+    @Test
+    public void shouldCommitRunningTasks() {
+        mockTaskInitialization();
+        t1.commit();
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assignedTasks.commit();
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
+        mockTaskInitialization();
+        t1.commit();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.commit();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
+        mockTaskInitialization();
+        t1.commit();
+        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.commit();
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            // ok
+        }
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCommitRunningTasksIfNeeded() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.commitNeeded()).andReturn(true);
+        t1.commit();
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.maybeCommit(), equalTo(1));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.commitNeeded()).andReturn(true);
+        t1.commit();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.maybeCommit();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
+        mockTaskInitialization();
+        t1.process();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.process();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldPunctuateRunningTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.punctuate(), equalTo(2));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+        mockTaskInitialization();
+        t1.maybePunctuateStreamTime();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.punctuate();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+        t1.maybePunctuateSystemTime();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.punctuate();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldReturnNumberOfPunctuations() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(false);
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.punctuate(), equalTo(1));
+        EasyMock.verify(t1);
+    }
+
+    private void addAndInitTask() {
+        assignedTasks.addNewTask(t1);
+        assignedTasks.initializeNewTasks();
+    }
+
+    private RuntimeException suspendTask() {
+        addAndInitTask();
+        return assignedTasks.suspend();
+    }
+
+    private void mockRunningTaskSuspension() {
+        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
+        t1.suspend();
+        EasyMock.expectLastCall();
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
deleted file mode 100644
index a721936..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.processor.TaskId;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class AssignedTasksTest {
-
-    private final Task t1 = EasyMock.createMock(Task.class);
-    private final Task t2 = EasyMock.createMock(Task.class);
-    private final TopicPartition tp1 = new TopicPartition("t1", 0);
-    private final TopicPartition tp2 = new TopicPartition("t2", 0);
-    private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
-    private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
-    private final TaskId taskId1 = new TaskId(0, 0);
-    private final TaskId taskId2 = new TaskId(1, 0);
-    private AssignedTasks assignedTasks;
-
-    @Before
-    public void before() {
-        assignedTasks = new AssignedTasks(new LogContext("log "), "task");
-        EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
-        EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
-    }
-
-    @Test
-    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(true);
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
-        assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
-    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
-        assertTrue(partitions.isEmpty());
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
-    public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.expect(t2.initialize()).andReturn(true);
-        final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
-        EasyMock.expect(t2.partitions()).andReturn(t2partitions);
-        EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
-
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
-
-        Collection<Task> restoring = assignedTasks.restoringTasks();
-        assertThat(restoring.size(), equalTo(1));
-        assertSame(restoring.iterator().next(), t1);
-        assertThat(readyPartitions, equalTo(t2partitions));
-    }
-
-    @Test
-    public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
-
-        EasyMock.replay(t2);
-
-        assignedTasks.addNewTask(t2);
-        final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
-        assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
-    }
-
-    @Test
-    public void shouldTransitionFullyRestoredTasksToRunning() {
-        final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
-        EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
-        EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
-        Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
-        assertThat(partitions, equalTo(task1Partitions));
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
-    }
-
-    @Test
-    public void shouldSuspendRunningTasks() {
-        mockRunningTaskSuspension();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-
-        assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseRestoringTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        t1.close(false, false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldClosedUnInitializedTasksOnSuspend() {
-        t1.close(false, false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assignedTasks.addNewTask(t1);
-        assertThat(assignedTasks.suspend(), nullValue());
-
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldNotSuspendSuspendedTasks() {
-        mockRunningTaskSuspension();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-        assertThat(assignedTasks.suspend(), nullValue());
-        EasyMock.verify(t1);
-    }
-
-
-    @Test
-    public void shouldCloseTaskOnSuspendWhenRuntimeException() {
-        mockTaskInitialization();
-        t1.suspend();
-        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
-        t1.close(false, false);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), not(nullValue()));
-        assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
-        mockTaskInitialization();
-        t1.suspend();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-        assertTrue(assignedTasks.previousTaskIds().isEmpty());
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldResumeMatchingSuspendedTasks() {
-        mockRunningTaskSuspension();
-        t1.resume();
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-
-        assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
-        mockRunningTaskSuspension();
-        t1.resume();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        assertThat(suspendTask(), nullValue());
-
-        try {
-            assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
-    }
-
-    private void mockTaskInitialization() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
-    }
-
-    @Test
-    public void shouldCommitRunningTasks() {
-        mockTaskInitialization();
-        t1.commit();
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        assignedTasks.commit();
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
-        mockTaskInitialization();
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.commit();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
-        mockTaskInitialization();
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.commit();
-            fail("Should have thrown exception");
-        } catch (Exception e) {
-            // ok
-        }
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCommitRunningTasksIfNeeded() {
-        mockTaskInitialization();
-        EasyMock.expect(t1.commitNeeded()).andReturn(true);
-        t1.commit();
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        assertThat(assignedTasks.maybeCommit(), equalTo(1));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
-        mockTaskInitialization();
-        EasyMock.expect(t1.commitNeeded()).andReturn(true);
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.maybeCommit();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
-        mockTaskInitialization();
-        t1.process();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.process();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldPunctuateRunningTasks() {
-        mockTaskInitialization();
-        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
-        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        assertThat(assignedTasks.punctuate(), equalTo(2));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
-        mockTaskInitialization();
-        t1.maybePunctuateStreamTime();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.punctuate();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
-        mockTaskInitialization();
-        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
-        t1.maybePunctuateSystemTime();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
-        t1.close(false, true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(t1);
-        addAndInitTask();
-
-        try {
-            assignedTasks.punctuate();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldReturnNumberOfPunctuations() {
-        mockTaskInitialization();
-        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
-        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(false);
-        EasyMock.replay(t1);
-
-        addAndInitTask();
-
-        assertThat(assignedTasks.punctuate(), equalTo(1));
-        EasyMock.verify(t1);
-    }
-
-    private void addAndInitTask() {
-        assignedTasks.addNewTask(t1);
-        assignedTasks.initializeNewTasks();
-    }
-
-    private RuntimeException suspendTask() {
-        addAndInitTask();
-        return assignedTasks.suspend();
-    }
-
-    private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
-        EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
-        t1.suspend();
-        EasyMock.expectLastCall();
-    }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 705bcf9..342354c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -61,7 +61,7 @@ public class StoreChangelogReaderTest {
     @Mock(type = MockType.NICE)
     private RestoringTasks active;
     @Mock(type = MockType.NICE)
-    private Task task;
+    private StreamTask task;
 
     private final MockStateRestoreListener callback = new MockStateRestoreListener();
     private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b11b8c2..1640f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,17 +61,19 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private Consumer<byte[], byte[]> consumer;
     @Mock(type = MockType.NICE)
-    private StreamThread.AbstractTaskCreator activeTaskCreator;
+    private StreamThread.AbstractTaskCreator<StreamTask> activeTaskCreator;
     @Mock(type = MockType.NICE)
-    private StreamThread.AbstractTaskCreator standbyTaskCreator;
+    private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     @Mock(type = MockType.NICE)
     private ThreadMetadataProvider threadMetadataProvider;
     @Mock(type = MockType.NICE)
-    private Task firstTask;
+    private StreamTask streamTask;
     @Mock(type = MockType.NICE)
-    private AssignedTasks active;
+    private StandbyTask standbyTask;
     @Mock(type = MockType.NICE)
-    private AssignedTasks standby;
+    private AssignedStreamsTasks active;
+    @Mock(type = MockType.NICE)
+    private AssignedStandbyTasks standby;
 
     private TaskManager taskManager;
 
@@ -139,7 +141,7 @@ public class TaskManagerTest {
     public void shouldAddNonResumedActiveTasks() {
         mockSingleActiveTask();
         EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
-        active.addNewTask(EasyMock.same(firstTask));
+        active.addNewTask(EasyMock.same(streamTask));
         replay();
 
         taskManager.createTasks(taskId0Partitions);
@@ -164,7 +166,7 @@ public class TaskManagerTest {
     public void shouldAddNonResumedStandbyTasks() {
         mockStandbyTaskExpectations();
         EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
-        standby.addNewTask(EasyMock.same(firstTask));
+        standby.addNewTask(EasyMock.same(standbyTask));
         replay();
 
         taskManager.createTasks(taskId0Partitions);
@@ -470,7 +472,7 @@ public class TaskManagerTest {
     }
 
     private void mockAssignStandbyPartitions(final long offset) {
-        final Task task = EasyMock.createNiceMock(Task.class);
+        final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
         EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(true);
         EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
@@ -487,7 +489,7 @@ public class TaskManagerTest {
         mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
         expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(),
                                                    EasyMock.eq(taskId0Assignment)))
-                .andReturn(Collections.singletonList(firstTask));
+                .andReturn(Collections.singletonList(standbyTask));
 
     }
 
@@ -497,7 +499,7 @@ public class TaskManagerTest {
 
         expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class),
                                                   EasyMock.eq(taskId0Assignment)))
-                .andReturn(Collections.singletonList(firstTask));
+                .andReturn(Collections.singletonList(streamTask));
 
     }