You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/06 18:26:56 UTC
[2/2] kafka git commit: KAFKA-6115: TaskManager should be type aware
KAFKA-6115: TaskManager should be type aware
- remove type specific methods from Task interface
- add generics to preserve task type
- add sub classes for different task types
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #4129 from mjsax/kafka-6115-taskManager-should-be-type-aware
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d637ad0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d637ad0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d637ad0d
Branch: refs/heads/trunk
Commit: d637ad0dafb727bb73c63f1b187d9f83abcd4ec1
Parents: 0c89570
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Nov 6 10:26:52 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 6 10:26:52 2017 -0800
----------------------------------------------------------------------
.../processor/internals/AbstractTask.java | 8 +-
.../internals/AssignedStandbyTasks.java | 27 ++
.../internals/AssignedStreamsTasks.java | 128 +++++
.../processor/internals/AssignedTasks.java | 174 ++-----
.../processor/internals/RestoringTasks.java | 2 +-
.../processor/internals/StandbyTask.java | 25 -
.../streams/processor/internals/StreamTask.java | 11 -
.../processor/internals/StreamThread.java | 36 +-
.../kafka/streams/processor/internals/Task.java | 59 +--
.../streams/processor/internals/TaskAction.java | 4 +-
.../processor/internals/TaskManager.java | 32 +-
.../processor/internals/AbstractTaskTest.java | 37 --
.../internals/AssignedStreamsTasksTest.java | 461 +++++++++++++++++++
.../processor/internals/AssignedTasksTest.java | 461 -------------------
.../internals/StoreChangelogReaderTest.java | 2 +-
.../processor/internals/TaskManagerTest.java | 22 +-
.../StreamThreadStateStoreProviderTest.java | 3 +-
17 files changed, 733 insertions(+), 759 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 52465ed..b0ae23c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -102,22 +102,22 @@ public abstract class AbstractTask implements Task {
}
@Override
- public final String applicationId() {
+ public String applicationId() {
return applicationId;
}
@Override
- public final Set<TopicPartition> partitions() {
+ public Set<TopicPartition> partitions() {
return partitions;
}
@Override
- public final ProcessorTopology topology() {
+ public ProcessorTopology topology() {
return topology;
}
@Override
- public final ProcessorContext context() {
+ public ProcessorContext context() {
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
new file mode 100644
index 0000000..a99e451
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
+
+ AssignedStandbyTasks(final LogContext logContext) {
+ super(logContext, "standby task");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
new file mode 100644
index 0000000..5ef404f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+
+class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
+ private final Logger log;
+ private final TaskAction<StreamTask> maybeCommitAction;
+ private int committed = 0;
+
+ AssignedStreamsTasks(final LogContext logContext) {
+ super(logContext, "stream task");
+
+ this.log = logContext.logger(getClass());
+
+ maybeCommitAction = new TaskAction<StreamTask>() {
+ @Override
+ public String name() {
+ return "maybeCommit";
+ }
+
+ @Override
+ public void apply(final StreamTask task) {
+ if (task.commitNeeded()) {
+ committed++;
+ task.commit();
+ log.debug("Committed active task {} per user request in", task.id());
+ }
+ }
+ };
+ }
+
+ @Override
+ public StreamTask restoringTaskFor(final TopicPartition partition) {
+ return restoringByPartition.get(partition);
+ }
+
+ /**
+ * @throws TaskMigratedException if committing offsets failed (non-EOS)
+ * or if the task producer got fenced (EOS)
+ */
+ int maybeCommit() {
+ committed = 0;
+ applyToRunningTasks(maybeCommitAction);
+ return committed;
+ }
+
+ /**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
+ int process() {
+ int processed = 0;
+ final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
+ while (it.hasNext()) {
+ final StreamTask task = it.next().getValue();
+ try {
+ if (task.process()) {
+ processed++;
+ }
+ } catch (final TaskMigratedException e) {
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
+ it.remove();
+ throw e;
+ } catch (final RuntimeException e) {
+ log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+ throw e;
+ }
+ }
+ return processed;
+ }
+
+ /**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
+ int punctuate() {
+ int punctuated = 0;
+ final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
+ while (it.hasNext()) {
+ final StreamTask task = it.next().getValue();
+ try {
+ if (task.maybePunctuateStreamTime()) {
+ punctuated++;
+ }
+ if (task.maybePunctuateSystemTime()) {
+ punctuated++;
+ }
+ } catch (final TaskMigratedException e) {
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
+ it.remove();
+ throw e;
+ } catch (final KafkaException e) {
+ log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
+ throw e;
+ }
+ }
+ return punctuated;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a3c9c2f..b90ec10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
@@ -37,22 +36,19 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-class AssignedTasks implements RestoringTasks {
+abstract class AssignedTasks<T extends Task> {
private final Logger log;
private final String taskTypeName;
- private final TaskAction maybeCommitAction;
- private final TaskAction commitAction;
- private Map<TaskId, Task> created = new HashMap<>();
- private Map<TaskId, Task> suspended = new HashMap<>();
- private Map<TaskId, Task> restoring = new HashMap<>();
+ private final TaskAction<T> commitAction;
+ private Map<TaskId, T> created = new HashMap<>();
+ private Map<TaskId, T> suspended = new HashMap<>();
+ private Map<TaskId, T> restoring = new HashMap<>();
private Set<TopicPartition> restoredPartitions = new HashSet<>();
private Set<TaskId> previousActiveTasks = new HashSet<>();
// IQ may access this map.
- private Map<TaskId, Task> running = new ConcurrentHashMap<>();
- private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
- private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
- private int committed = 0;
-
+ Map<TaskId, T> running = new ConcurrentHashMap<>();
+ private Map<TopicPartition, T> runningByPartition = new HashMap<>();
+ Map<TopicPartition, T> restoringByPartition = new HashMap<>();
AssignedTasks(final LogContext logContext,
final String taskTypeName) {
@@ -60,36 +56,20 @@ class AssignedTasks implements RestoringTasks {
this.log = logContext.logger(getClass());
- maybeCommitAction = new TaskAction() {
- @Override
- public String name() {
- return "maybeCommit";
- }
-
- @Override
- public void apply(final Task task) {
- if (task.commitNeeded()) {
- committed++;
- task.commit();
- log.debug("Committed active task {} per user request in", task.id());
- }
- }
- };
-
- commitAction = new TaskAction() {
+ commitAction = new TaskAction<T>() {
@Override
public String name() {
return "commit";
}
@Override
- public void apply(final Task task) {
+ public void apply(final T task) {
task.commit();
}
};
}
- void addNewTask(final Task task) {
+ void addNewTask(final T task) {
created.put(task.id(), task);
}
@@ -98,7 +78,7 @@ class AssignedTasks implements RestoringTasks {
return Collections.emptySet();
}
final Set<TopicPartition> partitions = new HashSet<>();
- for (final Map.Entry<TaskId, Task> entry : created.entrySet()) {
+ for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
if (entry.getValue().hasStateStores()) {
partitions.addAll(entry.getValue().partitions());
}
@@ -116,8 +96,8 @@ class AssignedTasks implements RestoringTasks {
if (!created.isEmpty()) {
log.debug("Initializing {}s {}", taskTypeName, created.keySet());
}
- for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) {
- final Map.Entry<TaskId, Task> entry = it.next();
+ for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
+ final Map.Entry<TaskId, T> entry = it.next();
try {
if (!entry.getValue().initialize()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
@@ -141,9 +121,9 @@ class AssignedTasks implements RestoringTasks {
log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored);
- for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
- final Map.Entry<TaskId, Task> entry = it.next();
- final Task task = entry.getValue();
+ for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
+ final Map.Entry<TaskId, T> entry = it.next();
+ final T task = entry.getValue();
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task, resume);
it.remove();
@@ -174,7 +154,7 @@ class AssignedTasks implements RestoringTasks {
&& restoring.isEmpty();
}
- Collection<Task> running() {
+ Collection<T> running() {
return running.values();
}
@@ -196,9 +176,9 @@ class AssignedTasks implements RestoringTasks {
return firstException.get();
}
- private RuntimeException closeNonRunningTasks(final Collection<Task> tasks) {
+ private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
- for (final Task task : tasks) {
+ for (final T task : tasks) {
try {
task.close(false, false);
} catch (final RuntimeException e) {
@@ -211,10 +191,10 @@ class AssignedTasks implements RestoringTasks {
return exception;
}
- private RuntimeException suspendTasks(final Collection<Task> tasks) {
+ private RuntimeException suspendTasks(final Collection<T> tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
- for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
- final Task task = it.next();
+ for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+ final T task = it.next();
try {
task.suspend();
suspended.put(task.id(), task);
@@ -235,7 +215,7 @@ class AssignedTasks implements RestoringTasks {
return firstException.get();
}
- private RuntimeException closeZombieTask(final Task task) {
+ RuntimeException closeZombieTask(final T task) {
log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
try {
task.close(false, true);
@@ -255,7 +235,7 @@ class AssignedTasks implements RestoringTasks {
*/
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
- final Task task = suspended.get(taskId);
+ final T task = suspended.get(taskId);
log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
@@ -279,7 +259,7 @@ class AssignedTasks implements RestoringTasks {
return false;
}
- private void addToRestoring(final Task task) {
+ private void addToRestoring(final T task) {
restoring.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
@@ -289,7 +269,7 @@ class AssignedTasks implements RestoringTasks {
}
}
- private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) {
+ private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
@@ -303,12 +283,7 @@ class AssignedTasks implements RestoringTasks {
}
}
- @Override
- public Task restoringTaskFor(final TopicPartition partition) {
- return restoringByPartition.get(partition);
- }
-
- Task runningTaskFor(final TopicPartition partition) {
+ T runningTaskFor(final TopicPartition partition) {
return runningByPartition.get(partition);
}
@@ -316,7 +291,7 @@ class AssignedTasks implements RestoringTasks {
return running.keySet();
}
- Map<TaskId, Task> runningTaskMap() {
+ Map<TaskId, T> runningTaskMap() {
return Collections.unmodifiableMap(running);
}
@@ -330,18 +305,18 @@ class AssignedTasks implements RestoringTasks {
}
private void describe(final StringBuilder builder,
- final Collection<Task> tasks,
+ final Collection<T> tasks,
final String indent,
final String name) {
builder.append(indent).append(name);
- for (final Task t : tasks) {
+ for (final T t : tasks) {
builder.append(indent).append(t.toString(indent + "\t\t"));
}
builder.append("\n");
}
- private List<Task> allTasks() {
- final List<Task> tasks = new ArrayList<>();
+ private List<T> allTasks() {
+ final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
tasks.addAll(suspended.values());
tasks.addAll(restoring.values());
@@ -349,7 +324,7 @@ class AssignedTasks implements RestoringTasks {
return tasks;
}
- Collection<Task> restoringTasks() {
+ Collection<T> restoringTasks() {
return Collections.unmodifiableCollection(restoring.values());
}
@@ -384,78 +359,11 @@ class AssignedTasks implements RestoringTasks {
return running.size();
}
- /**
- * @throws TaskMigratedException if committing offsets failed (non-EOS)
- * or if the task producer got fenced (EOS)
- */
- int maybeCommit() {
- committed = 0;
- applyToRunningTasks(maybeCommitAction);
- return committed;
- }
-
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
- int process() {
- int processed = 0;
- final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
- while (it.hasNext()) {
- final Task task = it.next().getValue();
- try {
- if (task.process()) {
- processed++;
- }
- } catch (final TaskMigratedException e) {
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
- throw e;
- } catch (final RuntimeException e) {
- log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
- throw e;
- }
- }
- return processed;
- }
-
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
- int punctuate() {
- int punctuated = 0;
- final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
- while (it.hasNext()) {
- final Task task = it.next().getValue();
- try {
- if (task.maybePunctuateStreamTime()) {
- punctuated++;
- }
- if (task.maybePunctuateSystemTime()) {
- punctuated++;
- }
- } catch (final TaskMigratedException e) {
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
- throw e;
- } catch (final KafkaException e) {
- log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
- throw e;
- }
- }
- return punctuated;
- }
-
- private void applyToRunningTasks(final TaskAction action) {
+ void applyToRunningTasks(final TaskAction<T> action) {
RuntimeException firstException = null;
- for (Iterator<Task> it = running().iterator(); it.hasNext(); ) {
- final Task task = it.next();
+ for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
+ final T task = it.next();
try {
action.apply(task);
} catch (final TaskMigratedException e) {
@@ -485,9 +393,9 @@ class AssignedTasks implements RestoringTasks {
}
void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment) {
- final Iterator<Task> standByTaskIterator = suspended.values().iterator();
+ final Iterator<T> standByTaskIterator = suspended.values().iterator();
while (standByTaskIterator.hasNext()) {
- final Task suspendedTask = standByTaskIterator.next();
+ final T suspendedTask = standByTaskIterator.next();
if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
try {
@@ -503,7 +411,7 @@ class AssignedTasks implements RestoringTasks {
void close(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
- for (final Task task : allTasks()) {
+ for (final T task : allTasks()) {
try {
task.close(clean, false);
} catch (final TaskMigratedException e) {
@@ -531,7 +439,7 @@ class AssignedTasks implements RestoringTasks {
}
}
- private boolean closeUnclean(final Task task) {
+ private boolean closeUnclean(final T task) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
task.close(false, false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
index 6ed28fd..3671b49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -19,5 +19,5 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
public interface RestoringTasks {
- Task restoringTaskFor(final TopicPartition partition);
+ StreamTask restoringTaskFor(final TopicPartition partition);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 98ec810..fbbb357 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -147,11 +147,6 @@ public class StandbyTask extends AbstractTask {
close(clean, isZombie);
}
- @Override
- public boolean commitNeeded() {
- return false;
- }
-
/**
* Updates a state store using records from one change log partition
*
@@ -163,28 +158,8 @@ public class StandbyTask extends AbstractTask {
return stateMgr.updateStandbyStates(partition, records);
}
- @Override
- public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
- throw new UnsupportedOperationException("add records not supported by StandbyTasks");
- }
-
public Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
}
- @Override
- public boolean maybePunctuateStreamTime() {
- throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
- }
-
- @Override
- public boolean maybePunctuateSystemTime() {
- throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
- }
-
- @Override
- public boolean process() {
- throw new UnsupportedOperationException("process not supported by StandbyTasks");
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 06f45ed..4b78e27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -42,7 +42,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static java.lang.String.format;
@@ -493,11 +492,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
- @Override
- public Map<TopicPartition, Long> checkpointedOffsets() {
- throw new UnsupportedOperationException("checkpointedOffsets is not supported by StreamTasks");
- }
-
/**
* <pre>
* - {@link #suspend(boolean) suspend(clean)}
@@ -619,11 +613,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
}
- @Override
- public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
- throw new UnsupportedOperationException("update is not implemented");
- }
-
/**
* Request committing the current task's state
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72b3a52..1982afb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -308,7 +308,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
}
- static abstract class AbstractTaskCreator {
+ static abstract class AbstractTaskCreator<T extends Task> {
final String applicationId;
final InternalTopologyBuilder builder;
final StreamsConfig config;
@@ -342,12 +342,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
- final List<Task> createdTasks = new ArrayList<>();
+ Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
+ final List<T> createdTasks = new ArrayList<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
- Task task = createTask(consumer, taskId, partitions);
+ T task = createTask(consumer, taskId, partitions);
if (task != null) {
log.trace("Created task {} with assigned partitions {}", taskId, partitions);
createdTasks.add(task);
@@ -357,12 +357,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
return createdTasks;
}
- abstract Task createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
+ abstract T createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
public void close() {}
}
- static class TaskCreator extends AbstractTaskCreator {
+ static class TaskCreator extends AbstractTaskCreator<StreamTask> {
private final ThreadCache cache;
private final KafkaClientSupplier clientSupplier;
private final String threadClientId;
@@ -441,7 +441,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
}
- static class StandbyTaskCreator extends AbstractTaskCreator {
+ static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
StandbyTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
final StreamsMetrics streamsMetrics,
@@ -706,12 +706,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
restoreConsumer,
activeTaskCreator,
standbyTaskCreator,
- new AssignedTasks(logContext,
- "stream task"
- ),
- new AssignedTasks(logContext,
- "standby task"
- ));
+ new AssignedStreamsTasks(logContext),
+ new AssignedStandbyTasks(logContext));
return new StreamThread(builder,
clientId,
@@ -916,7 +912,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
int numAddedRecords = 0;
for (final TopicPartition partition : records.partitions()) {
- final Task task = taskManager.activeTask(partition);
+ final StreamTask task = taskManager.activeTask(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -1040,7 +1036,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final TopicPartition partition = entry.getKey();
List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
if (remaining != null) {
- final Task task = taskManager.standbyTask(partition);
+ final StandbyTask task = taskManager.standbyTask(partition);
remaining = task.update(partition, remaining);
if (remaining != null) {
remainingStandbyRecords.put(partition, remaining);
@@ -1061,7 +1057,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) {
- final Task task = taskManager.standbyTask(partition);
+ final StandbyTask task = taskManager.standbyTask(partition);
if (task == null) {
throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
@@ -1101,7 +1097,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
setState(State.PENDING_SHUTDOWN);
}
- public Map<TaskId, Task> tasks() {
+ public Map<TaskId, StreamTask> tasks() {
return taskManager.activeTasks();
}
@@ -1248,16 +1244,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
return threadMetadata;
}
- private void updateThreadMetadata(final Map<TaskId, Task> activeTasks, final Map<TaskId, Task> standbyTasks) {
+ private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) {
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
if (activeTasks != null) {
- for (Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
+ for (Map.Entry<TaskId, StreamTask> task : activeTasks.entrySet()) {
activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
}
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
if (standbyTasks != null) {
- for (Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
+ for (Map.Entry<TaskId, StandbyTask> task : standbyTasks.entrySet()) {
standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 80e5423..f066bff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -24,61 +23,49 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Collection;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
public interface Task {
- void resume();
+ /**
+ * Initialize the task and return {}true if the task is ready to run, i.e, it has not state stores
+ * @return true if this task has no state stores that may need restoring.
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ boolean initialize();
void commit();
void suspend();
- void close(boolean clean, boolean isZombie);
+ void resume();
- TaskId id();
+ void closeSuspended(final boolean clean,
+ final boolean isZombie,
+ final RuntimeException e);
- String applicationId();
+ void close(final boolean clean,
+ final boolean isZombie);
- Set<TopicPartition> partitions();
+ StateStore getStore(final String name);
+
+ String applicationId();
ProcessorTopology topology();
ProcessorContext context();
- StateStore getStore(String name);
-
- void closeSuspended(boolean clean, boolean isZombie, RuntimeException e);
-
- Map<TopicPartition, Long> checkpointedOffsets();
-
- boolean process();
-
- boolean commitNeeded();
-
- boolean maybePunctuateStreamTime();
-
- boolean maybePunctuateSystemTime();
-
- List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> remaining);
-
- String toString(String indent);
-
- int addRecords(TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records);
-
- boolean hasStateStores();
+ TaskId id();
- /**
- * initialize the task and return true if the task is ready to run, i.e, it has not state stores
- * @return true if this task has no state stores that may need restoring.
- * @throws IllegalStateException If store gets registered after initialized is already finished
- * @throws StreamsException if the store's change log does not contain the partition
- */
- boolean initialize();
+ Set<TopicPartition> partitions();
/**
* @return any changelog partitions associated with this task
*/
Collection<TopicPartition> changelogPartitions();
+
+ boolean hasStateStores();
+
+ String toString(final String indent);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
index 9112594..da5f325 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
-interface TaskAction {
+interface TaskAction<T extends Task> {
String name();
- void apply(final Task task);
+ void apply(final T task);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6f4cc51..0238615 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -37,23 +37,23 @@ class TaskManager {
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
private final Logger log;
- private final AssignedTasks active;
- private final AssignedTasks standby;
+ private final AssignedStreamsTasks active;
+ private final AssignedStandbyTasks standby;
private final ChangelogReader changelogReader;
private final String logPrefix;
private final Consumer<byte[], byte[]> restoreConsumer;
- private final StreamThread.AbstractTaskCreator taskCreator;
- private final StreamThread.AbstractTaskCreator standbyTaskCreator;
+ private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
+ private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
private ThreadMetadataProvider threadMetadataProvider;
private Consumer<byte[], byte[]> consumer;
TaskManager(final ChangelogReader changelogReader,
final String logPrefix,
final Consumer<byte[], byte[]> restoreConsumer,
- final StreamThread.AbstractTaskCreator taskCreator,
- final StreamThread.AbstractTaskCreator standbyTaskCreator,
- final AssignedTasks active,
- final AssignedTasks standby) {
+ final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
+ final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
+ final AssignedStreamsTasks active,
+ final AssignedStandbyTasks standby) {
this.changelogReader = changelogReader;
this.logPrefix = logPrefix;
this.restoreConsumer = restoreConsumer;
@@ -133,7 +133,7 @@ class TaskManager {
// -> other thread will call removeSuspendedTasks(); eventually
log.trace("New active tasks to be created: {}", newTasks);
- for (final Task task : taskCreator.createTasks(consumer, newTasks)) {
+ for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
active.addNewTask(task);
}
}
@@ -166,7 +166,7 @@ class TaskManager {
// -> other thread will call removeSuspendedStandbyTasks(); eventually
log.trace("New standby tasks to be created: {}", newStandbyTasks);
- for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
+ for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
standby.addNewTask(task);
}
}
@@ -240,20 +240,20 @@ class TaskManager {
return standby.previousTaskIds();
}
- Task activeTask(final TopicPartition partition) {
+ StreamTask activeTask(final TopicPartition partition) {
return active.runningTaskFor(partition);
}
- Task standbyTask(final TopicPartition partition) {
+ StandbyTask standbyTask(final TopicPartition partition) {
return standby.runningTaskFor(partition);
}
- Map<TaskId, Task> activeTasks() {
+ Map<TaskId, StreamTask> activeTasks() {
return active.runningTaskMap();
}
- Map<TaskId, Task> standbyTasks() {
+ Map<TaskId, StandbyTask> standbyTasks() {
return standby.runningTaskMap();
}
@@ -293,9 +293,9 @@ class TaskManager {
}
private void assignStandbyPartitions() {
- final Collection<Task> running = standby.running();
+ final Collection<StandbyTask> running = standby.running();
final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
- for (final Task standbyTask : running) {
+ for (final StandbyTask standbyTask : running) {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 02aa0a0..efc6f79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -40,7 +39,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.fail;
@@ -142,41 +140,6 @@ public class AbstractTaskTest {
public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
@Override
- public Map<TopicPartition, Long> checkpointedOffsets() {
- return null;
- }
-
- @Override
- public boolean process() {
- return false;
- }
-
- @Override
- public boolean commitNeeded() {
- return false;
- }
-
- @Override
- public boolean maybePunctuateStreamTime() {
- return false;
- }
-
- @Override
- public boolean maybePunctuateSystemTime() {
- return false;
- }
-
- @Override
- public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
- return null;
- }
-
- @Override
- public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
- return 0;
- }
-
- @Override
public boolean initialize() {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
new file mode 100644
index 0000000..3d33b0b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AssignedStreamsTasksTest {
+
+ private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
+ private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
+ private final TopicPartition tp1 = new TopicPartition("t1", 0);
+ private final TopicPartition tp2 = new TopicPartition("t2", 0);
+ private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
+ private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
+ private final TaskId taskId1 = new TaskId(0, 0);
+ private final TaskId taskId2 = new TaskId(1, 0);
+ private AssignedStreamsTasks assignedTasks;
+
+ @Before
+ public void before() {
+ assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
+ EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
+ EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
+ }
+
+ @Test
+ public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
+ EasyMock.expect(t1.hasStateStores()).andReturn(true);
+ EasyMock.expect(t2.hasStateStores()).andReturn(true);
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+ EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+ EasyMock.replay(t1, t2);
+
+ assignedTasks.addNewTask(t1);
+ assignedTasks.addNewTask(t2);
+
+ final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
+ assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
+ EasyMock.verify(t1, t2);
+ }
+
+ @Test
+ public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
+ EasyMock.expect(t1.hasStateStores()).andReturn(false);
+ EasyMock.expect(t2.hasStateStores()).andReturn(false);
+ EasyMock.replay(t1, t2);
+
+ assignedTasks.addNewTask(t1);
+ assignedTasks.addNewTask(t2);
+
+ final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
+ assertTrue(partitions.isEmpty());
+ EasyMock.verify(t1, t2);
+ }
+
+ @Test
+ public void shouldInitializeNewTasks() {
+ EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
+ EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+ EasyMock.expect(t2.initialize()).andReturn(true);
+ final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
+ EasyMock.expect(t2.partitions()).andReturn(t2partitions);
+ EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+ EasyMock.expect(t2.hasStateStores()).andReturn(true);
+
+ EasyMock.replay(t1, t2);
+
+ assignedTasks.addNewTask(t1);
+ assignedTasks.addNewTask(t2);
+
+ final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
+
+ Collection<StreamTask> restoring = assignedTasks.restoringTasks();
+ assertThat(restoring.size(), equalTo(1));
+ assertSame(restoring.iterator().next(), t1);
+ assertThat(readyPartitions, equalTo(t2partitions));
+ }
+
+ @Test
+ public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
+ EasyMock.expect(t2.initialize()).andReturn(true);
+ EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+ EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+ EasyMock.expect(t2.hasStateStores()).andReturn(false);
+
+ EasyMock.replay(t2);
+
+ assignedTasks.addNewTask(t2);
+ final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
+ assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
+ }
+
+ @Test
+ public void shouldTransitionFullyRestoredTasksToRunning() {
+ final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
+ EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
+ EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
+ Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
+ assertThat(partitions, equalTo(task1Partitions));
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+ }
+
+ @Test
+ public void shouldSuspendRunningTasks() {
+ mockRunningTaskSuspension();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+
+ assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseRestoringTasks() {
+ EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+ t1.close(false, false);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldClosedUnInitializedTasksOnSuspend() {
+ t1.close(false, false);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assignedTasks.addNewTask(t1);
+ assertThat(assignedTasks.suspend(), nullValue());
+
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldNotSuspendSuspendedTasks() {
+ mockRunningTaskSuspension();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+ assertThat(assignedTasks.suspend(), nullValue());
+ EasyMock.verify(t1);
+ }
+
+
+ @Test
+ public void shouldCloseTaskOnSuspendWhenRuntimeException() {
+ mockTaskInitialization();
+ t1.suspend();
+ EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
+ t1.close(false, false);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), not(nullValue()));
+ assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
+ mockTaskInitialization();
+ t1.suspend();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+ assertTrue(assignedTasks.previousTaskIds().isEmpty());
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldResumeMatchingSuspendedTasks() {
+ mockRunningTaskSuspension();
+ t1.resume();
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+
+ assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+ mockRunningTaskSuspension();
+ t1.resume();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ assertThat(suspendTask(), nullValue());
+
+ try {
+ assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+ EasyMock.verify(t1);
+ }
+
+ private void mockTaskInitialization() {
+ EasyMock.expect(t1.initialize()).andReturn(true);
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+ EasyMock.expect(t1.hasStateStores()).andReturn(false);
+ }
+
+ @Test
+ public void shouldCommitRunningTasks() {
+ mockTaskInitialization();
+ t1.commit();
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ assignedTasks.commit();
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnCommitIfTaskMigratedException() {
+ mockTaskInitialization();
+ t1.commit();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.commit();
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
+ mockTaskInitialization();
+ t1.commit();
+ EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.commit();
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ // ok
+ }
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCommitRunningTasksIfNeeded() {
+ mockTaskInitialization();
+ EasyMock.expect(t1.commitNeeded()).andReturn(true);
+ t1.commit();
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ assertThat(assignedTasks.maybeCommit(), equalTo(1));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
+ mockTaskInitialization();
+ EasyMock.expect(t1.commitNeeded()).andReturn(true);
+ t1.commit();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.maybeCommit();
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
+ mockTaskInitialization();
+ t1.process();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.process();
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldPunctuateRunningTasks() {
+ mockTaskInitialization();
+ EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+ EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ assertThat(assignedTasks.punctuate(), equalTo(2));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+ mockTaskInitialization();
+ t1.maybePunctuateStreamTime();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.punctuate();
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
+ mockTaskInitialization();
+ EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+ t1.maybePunctuateSystemTime();
+ EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+ t1.close(false, true);
+ EasyMock.expectLastCall();
+ EasyMock.replay(t1);
+ addAndInitTask();
+
+ try {
+ assignedTasks.punctuate();
+ fail("Should have thrown TaskMigratedException.");
+ } catch (final TaskMigratedException expected) { /* ignore */ }
+ EasyMock.verify(t1);
+ }
+
+ @Test
+ public void shouldReturnNumberOfPunctuations() {
+ mockTaskInitialization();
+ EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+ EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(false);
+ EasyMock.replay(t1);
+
+ addAndInitTask();
+
+ assertThat(assignedTasks.punctuate(), equalTo(1));
+ EasyMock.verify(t1);
+ }
+
+ private void addAndInitTask() {
+ assignedTasks.addNewTask(t1);
+ assignedTasks.initializeNewTasks();
+ }
+
+ private RuntimeException suspendTask() {
+ addAndInitTask();
+ return assignedTasks.suspend();
+ }
+
+ private void mockRunningTaskSuspension() {
+ EasyMock.expect(t1.initialize()).andReturn(true);
+ EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
+ EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+ EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
+ t1.suspend();
+ EasyMock.expectLastCall();
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
deleted file mode 100644
index a721936..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.processor.TaskId;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class AssignedTasksTest {
-
- private final Task t1 = EasyMock.createMock(Task.class);
- private final Task t2 = EasyMock.createMock(Task.class);
- private final TopicPartition tp1 = new TopicPartition("t1", 0);
- private final TopicPartition tp2 = new TopicPartition("t2", 0);
- private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
- private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
- private final TaskId taskId1 = new TaskId(0, 0);
- private final TaskId taskId2 = new TaskId(1, 0);
- private AssignedTasks assignedTasks;
-
- @Before
- public void before() {
- assignedTasks = new AssignedTasks(new LogContext("log "), "task");
- EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
- EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
- }
-
- @Test
- public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
- EasyMock.expect(t1.hasStateStores()).andReturn(true);
- EasyMock.expect(t2.hasStateStores()).andReturn(true);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
- EasyMock.replay(t1, t2);
-
- assignedTasks.addNewTask(t1);
- assignedTasks.addNewTask(t2);
-
- final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
- assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
- EasyMock.verify(t1, t2);
- }
-
- @Test
- public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
- EasyMock.expect(t1.hasStateStores()).andReturn(false);
- EasyMock.expect(t2.hasStateStores()).andReturn(false);
- EasyMock.replay(t1, t2);
-
- assignedTasks.addNewTask(t1);
- assignedTasks.addNewTask(t2);
-
- final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
- assertTrue(partitions.isEmpty());
- EasyMock.verify(t1, t2);
- }
-
- @Test
- public void shouldInitializeNewTasks() {
- EasyMock.expect(t1.initialize()).andReturn(false);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
- EasyMock.expect(t1.initialize()).andReturn(false);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
- EasyMock.expect(t2.initialize()).andReturn(true);
- final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
- EasyMock.expect(t2.partitions()).andReturn(t2partitions);
- EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t2.hasStateStores()).andReturn(true);
-
- EasyMock.replay(t1, t2);
-
- assignedTasks.addNewTask(t1);
- assignedTasks.addNewTask(t2);
-
- final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
-
- Collection<Task> restoring = assignedTasks.restoringTasks();
- assertThat(restoring.size(), equalTo(1));
- assertSame(restoring.iterator().next(), t1);
- assertThat(readyPartitions, equalTo(t2partitions));
- }
-
- @Test
- public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
- EasyMock.expect(t2.initialize()).andReturn(true);
- EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
- EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t2.hasStateStores()).andReturn(false);
-
- EasyMock.replay(t2);
-
- assignedTasks.addNewTask(t2);
- final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
- assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
- }
-
- @Test
- public void shouldTransitionFullyRestoredTasksToRunning() {
- final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
- EasyMock.expect(t1.initialize()).andReturn(false);
- EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
- EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
- EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
- Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
- assertThat(partitions, equalTo(task1Partitions));
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
- }
-
- @Test
- public void shouldSuspendRunningTasks() {
- mockRunningTaskSuspension();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
-
- assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseRestoringTasks() {
- EasyMock.expect(t1.initialize()).andReturn(false);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
- t1.close(false, false);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldClosedUnInitializedTasksOnSuspend() {
- t1.close(false, false);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assignedTasks.addNewTask(t1);
- assertThat(assignedTasks.suspend(), nullValue());
-
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldNotSuspendSuspendedTasks() {
- mockRunningTaskSuspension();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
- assertThat(assignedTasks.suspend(), nullValue());
- EasyMock.verify(t1);
- }
-
-
- @Test
- public void shouldCloseTaskOnSuspendWhenRuntimeException() {
- mockTaskInitialization();
- t1.suspend();
- EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
- t1.close(false, false);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), not(nullValue()));
- assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
- mockTaskInitialization();
- t1.suspend();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
- assertTrue(assignedTasks.previousTaskIds().isEmpty());
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldResumeMatchingSuspendedTasks() {
- mockRunningTaskSuspension();
- t1.resume();
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
-
- assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnResumeIfTaskMigratedException() {
- mockRunningTaskSuspension();
- t1.resume();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- assertThat(suspendTask(), nullValue());
-
- try {
- assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
- }
-
- private void mockTaskInitialization() {
- EasyMock.expect(t1.initialize()).andReturn(true);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t1.hasStateStores()).andReturn(false);
- }
-
- @Test
- public void shouldCommitRunningTasks() {
- mockTaskInitialization();
- t1.commit();
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- assignedTasks.commit();
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnCommitIfTaskMigratedException() {
- mockTaskInitialization();
- t1.commit();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.commit();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
- mockTaskInitialization();
- t1.commit();
- EasyMock.expectLastCall().andThrow(new RuntimeException(""));
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.commit();
- fail("Should have thrown exception");
- } catch (Exception e) {
- // ok
- }
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCommitRunningTasksIfNeeded() {
- mockTaskInitialization();
- EasyMock.expect(t1.commitNeeded()).andReturn(true);
- t1.commit();
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- assertThat(assignedTasks.maybeCommit(), equalTo(1));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
- mockTaskInitialization();
- EasyMock.expect(t1.commitNeeded()).andReturn(true);
- t1.commit();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.maybeCommit();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
- mockTaskInitialization();
- t1.process();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.process();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldPunctuateRunningTasks() {
- mockTaskInitialization();
- EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
- EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- assertThat(assignedTasks.punctuate(), equalTo(2));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
- mockTaskInitialization();
- t1.maybePunctuateStreamTime();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.punctuate();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
- mockTaskInitialization();
- EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
- t1.maybePunctuateSystemTime();
- EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
- t1.close(false, true);
- EasyMock.expectLastCall();
- EasyMock.replay(t1);
- addAndInitTask();
-
- try {
- assignedTasks.punctuate();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
- EasyMock.verify(t1);
- }
-
- @Test
- public void shouldReturnNumberOfPunctuations() {
- mockTaskInitialization();
- EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
- EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(false);
- EasyMock.replay(t1);
-
- addAndInitTask();
-
- assertThat(assignedTasks.punctuate(), equalTo(1));
- EasyMock.verify(t1);
- }
-
- private void addAndInitTask() {
- assignedTasks.addNewTask(t1);
- assignedTasks.initializeNewTasks();
- }
-
- private RuntimeException suspendTask() {
- addAndInitTask();
- return assignedTasks.suspend();
- }
-
- private void mockRunningTaskSuspension() {
- EasyMock.expect(t1.initialize()).andReturn(true);
- EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
- EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
- t1.suspend();
- EasyMock.expectLastCall();
- }
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 705bcf9..342354c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -61,7 +61,7 @@ public class StoreChangelogReaderTest {
@Mock(type = MockType.NICE)
private RestoringTasks active;
@Mock(type = MockType.NICE)
- private Task task;
+ private StreamTask task;
private final MockStateRestoreListener callback = new MockStateRestoreListener();
private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d637ad0d/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b11b8c2..1640f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,17 +61,19 @@ public class TaskManagerTest {
@Mock(type = MockType.NICE)
private Consumer<byte[], byte[]> consumer;
@Mock(type = MockType.NICE)
- private StreamThread.AbstractTaskCreator activeTaskCreator;
+ private StreamThread.AbstractTaskCreator<StreamTask> activeTaskCreator;
@Mock(type = MockType.NICE)
- private StreamThread.AbstractTaskCreator standbyTaskCreator;
+ private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
@Mock(type = MockType.NICE)
private ThreadMetadataProvider threadMetadataProvider;
@Mock(type = MockType.NICE)
- private Task firstTask;
+ private StreamTask streamTask;
@Mock(type = MockType.NICE)
- private AssignedTasks active;
+ private StandbyTask standbyTask;
@Mock(type = MockType.NICE)
- private AssignedTasks standby;
+ private AssignedStreamsTasks active;
+ @Mock(type = MockType.NICE)
+ private AssignedStandbyTasks standby;
private TaskManager taskManager;
@@ -139,7 +141,7 @@ public class TaskManagerTest {
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();
EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
- active.addNewTask(EasyMock.same(firstTask));
+ active.addNewTask(EasyMock.same(streamTask));
replay();
taskManager.createTasks(taskId0Partitions);
@@ -164,7 +166,7 @@ public class TaskManagerTest {
public void shouldAddNonResumedStandbyTasks() {
mockStandbyTaskExpectations();
EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
- standby.addNewTask(EasyMock.same(firstTask));
+ standby.addNewTask(EasyMock.same(standbyTask));
replay();
taskManager.createTasks(taskId0Partitions);
@@ -470,7 +472,7 @@ public class TaskManagerTest {
}
private void mockAssignStandbyPartitions(final long offset) {
- final Task task = EasyMock.createNiceMock(Task.class);
+ final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(true);
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
@@ -487,7 +489,7 @@ public class TaskManagerTest {
mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(),
EasyMock.eq(taskId0Assignment)))
- .andReturn(Collections.singletonList(firstTask));
+ .andReturn(Collections.singletonList(standbyTask));
}
@@ -497,7 +499,7 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class),
EasyMock.eq(taskId0Assignment)))
- .andReturn(Collections.singletonList(firstTask));
+ .andReturn(Collections.singletonList(streamTask));
}