You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/20 16:56:49 UTC
[3/3] samza git commit: SAMZA-863: Multithreading support in Samza
SAMZA-863: Multithreading support in Samza
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e5f31c57
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e5f31c57
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e5f31c57
Branch: refs/heads/master
Commit: e5f31c57c957e6a38f566d864d0e5acffba0327d
Parents: 9396ee5
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Jul 19 11:53:44 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Jul 19 11:53:44 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 10 +-
.../org/apache/samza/task/AsyncStreamTask.java | 60 ++
.../org/apache/samza/task/TaskCallback.java | 38 ++
.../apache/samza/container/RunLoopFactory.java | 112 ++++
.../org/apache/samza/task/AsyncRunLoop.java | 619 +++++++++++++++++++
.../samza/task/AsyncStreamTaskAdapter.java | 92 +++
.../apache/samza/task/CoordinatorRequests.java | 89 +++
.../apache/samza/task/TaskCallbackFactory.java | 28 +
.../org/apache/samza/task/TaskCallbackImpl.java | 104 ++++
.../apache/samza/task/TaskCallbackListener.java | 30 +
.../apache/samza/task/TaskCallbackManager.java | 141 +++++
.../task/TaskCallbackTimeoutException.java | 42 ++
.../main/java/org/apache/samza/util/Utils.java | 59 ++
.../apache/samza/checkpoint/OffsetManager.scala | 38 +-
.../org/apache/samza/config/JobConfig.scala | 11 +
.../org/apache/samza/config/TaskConfig.scala | 11 +
.../org/apache/samza/container/RunLoop.scala | 92 +--
.../apache/samza/container/SamzaContainer.scala | 195 ++++--
.../samza/container/SamzaContainerMetrics.scala | 2 +
.../apache/samza/container/TaskInstance.scala | 44 +-
.../samza/container/TaskInstanceMetrics.scala | 2 +
.../samza/coordinator/JobCoordinator.scala | 5 +-
.../apache/samza/system/SystemConsumers.scala | 31 +-
.../org/apache/samza/task/TestAsyncRunLoop.java | 333 ++++++++++
.../samza/task/TestAsyncStreamAdapter.java | 141 +++++
.../samza/task/TestCoordinatorRequests.java | 93 +++
.../apache/samza/task/TestTaskCallbackImpl.java | 125 ++++
.../samza/task/TestTaskCallbackManager.java | 141 +++++
.../apache/samza/container/TestRunLoop.scala | 64 +-
.../samza/container/TestSamzaContainer.scala | 6 +-
.../samza/container/TestTaskInstance.scala | 6 +-
.../samza/system/TestSystemConsumers.scala | 36 +-
.../samza/system/hdfs/HdfsSystemProducer.scala | 70 ++-
.../migration/KafkaCheckpointMigration.scala | 1 +
.../system/kafka/KafkaSystemProducer.scala | 215 ++++---
.../kafka/TestKafkaSystemProducerJava.java | 4 +-
.../system/kafka/TestKafkaSystemProducer.scala | 17 +-
.../kv/inmemory/InMemoryKeyValueStore.scala | 6 +-
.../samza/storage/kv/RocksDbKeyValueStore.scala | 3 -
.../storage/kv/TestRocksDbKeyValueStore.scala | 59 +-
.../apache/samza/storage/kv/CachedStore.scala | 194 +++---
.../samza/storage/kv/TestCachedStore.scala | 15 -
.../samza/storage/kv/TestKeyValueStores.scala | 191 +++++-
43 files changed, 3098 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 325c381..c85dc94 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -33,6 +33,7 @@
<allow pkg="org.apache.commons" />
<allow class="scala.collection.JavaConversions" />
<allow class="scala.collection.JavaConverters" />
+ <allow pkg="scala.runtime" />
<subpackage name="config">
<allow class="org.apache.samza.SamzaException" />
@@ -133,6 +134,10 @@
<allow pkg="org.apache.samza.container" />
<allow pkg="org.apache.samza.metrics" />
<allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.util" />
+ <allow pkg="org.apache.samza.checkpoint" />
+ <allow class="org.apache.samza.SamzaException" />
+ <allow class="org.apache.samza.Partition" />
</subpackage>
<subpackage name="container">
@@ -142,7 +147,10 @@
<allow pkg="org.apache.samza.util" />
<allow pkg="junit.framework" />
<allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
-
+ <allow class="org.apache.samza.SamzaException" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.task" />
+ <allow pkg="org.apache.samza.util" />
<subpackage name="grouper">
<subpackage name="stream">
<allow pkg="org.apache.samza.system" />
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
new file mode 100644
index 0000000..684ba0b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. It\u2019s provided for better
+ * parallelism and resource utilization. This class allows task to make asynchronous calls and fire callbacks upon completion.
+ * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by implementing other interfaces, such as
+ * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The following invariants hold with these mix-ins:
+ *
+ * InitableTask.init - always the first method invoked on an AsyncStreamTask. It happens-before every subsequent
+ * invocation on AsyncStreamTask (for happens-before semantics, see https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html).
+ *
+ * CloseableTask.close - always the last method invoked on an AsyncStreamTask and all other AsyncStreamTask are guaranteed
+ * to happen-before it.
+ *
+ * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.process.max.inflight.messages=1),
+ * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.process.max.inflight.messages>1),
+ * there is no such happens-before constraint and the AsyncStreamTask is required to coordinate any shared state.
+ *
+ * WindowableTask.window - in either above mode, it is called when no invocations to processAsync are pending and no new
+ * processAsync invocations can be scheduled until it completes. Therefore, a guarantee that all previous processAsync invocations
+ * happen before an invocation of WindowableTask.window. An invocation to WindowableTask.window is guaranteed to happen-before
+ * any subsequent processAsync invocations. The Samza engine is responsible for ensuring that window is invoked in a timely manner.
+ *
+ * Similar to WindowableTask.window, commits are guaranteed to happen only when there are no pending processAsync or WindowableTask.window
+ * invocations. All preceding invocations happen-before commit and commit happens-before all subsequent invocations.
+ */
+public interface AsyncStreamTask {
+ /**
+ * Called once for each message that this AsyncStreamTask receives.
+ * @param envelope Contains the received deserialized message and key, and also information regarding the stream and
+ * partition of which the message was received from.
+ * @param collector Contains the means of sending message envelopes to the output stream. The collector must only
+ * be used during the current call to the process method; you should not reuse the collector between invocations
+ * of this method.
+ * @param coordinator Manages execution of tasks.
+ * @param callback Triggers the completion of the process.
+ */
+ void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
new file mode 100644
index 0000000..8ba7a36
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * A TaskCallback is fired by a {@link AsyncStreamTask} to notify when an asynchronous
+ * process has completed. If the callback is fired multiple times, it will throw IllegalStateException.
+ */
+public interface TaskCallback {
+
+ /**
+ * Invoke when the asynchronous process completed with success.
+ */
+ void complete();
+
+ /**
+ * Invoke when the asynchronous process failed with an error.
+ * @param t error throwable
+ */
+ void failure(Throwable t);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
new file mode 100644
index 0000000..a789d04
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.task.AsyncRunLoop;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction1;
+
+import static org.apache.samza.util.Utils.defaultValue;
+import static org.apache.samza.util.Utils.defaultClock;
+
+/**
+ * Factory class to create runloop for a Samza task, based on the type
+ * of the task
+ */
+public class RunLoopFactory {
+ private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class);
+
+ private static final long DEFAULT_WINDOW_MS = -1L;
+ private static final long DEFAULT_COMMIT_MS = 60000L;
+ private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+ public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> taskInstances,
+ SystemConsumers consumerMultiplexer,
+ ExecutorService threadPool,
+ Executor executor,
+ SamzaContainerMetrics containerMetrics,
+ TaskConfig config) {
+
+ long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
+
+ log.info("Got window milliseconds: " + taskWindowMs);
+
+ long taskCommitMs = config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS));
+
+ log.info("Got commit milliseconds: " + taskCommitMs);
+
+ int asyncTaskCount = taskInstances.values().count(new AbstractFunction1<TaskInstance<?>, Object>() {
+ @Override
+ public Boolean apply(TaskInstance<?> t) {
+ return t.isAsyncTask();
+ }
+ });
+
+ // asyncTaskCount should be either 0 or the number of all taskInstances
+ if (asyncTaskCount > 0 && asyncTaskCount < taskInstances.size()) {
+ throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not supported");
+ }
+
+ if (asyncTaskCount == 0) {
+ log.info("Run loop in single thread mode.");
+
+ scala.collection.immutable.Map<TaskName, TaskInstance<StreamTask>> streamTaskInstances = (scala.collection.immutable.Map) taskInstances;
+ return new RunLoop(
+ streamTaskInstances,
+ consumerMultiplexer,
+ containerMetrics,
+ taskWindowMs,
+ taskCommitMs,
+ defaultClock(),
+ executor);
+ } else {
+ Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
+
+ log.info("Got max messages in flight: " + taskMaxConcurrency);
+
+ Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
+
+ log.info("Got callback timeout: " + callbackTimeout);
+
+ scala.collection.immutable.Map<TaskName, TaskInstance<AsyncStreamTask>> asyncStreamTaskInstances = (scala.collection.immutable.Map) taskInstances;
+
+ log.info("Run loop in asynchronous mode.");
+
+ return new AsyncRunLoop(
+ JavaConversions.asJavaMap(asyncStreamTaskInstances),
+ threadPool,
+ consumerMultiplexer,
+ taskMaxConcurrency,
+ taskWindowMs,
+ taskCommitMs,
+ callbackTimeout,
+ containerMetrics);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
new file mode 100644
index 0000000..a510bb0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+
+/**
+ * The AsyncRunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s.
+ */
+public class AsyncRunLoop implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class);
+
+ private final Map<TaskName, AsyncTaskWorker> taskWorkers;
+ private final SystemConsumers consumerMultiplexer;
+ private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
+ private final ExecutorService threadPool;
+ private final CoordinatorRequests coordinatorRequests;
+ private final Object latch;
+ private final int maxConcurrency;
+ private final long windowMs;
+ private final long commitMs;
+ private final long callbackTimeoutMs;
+ private final SamzaContainerMetrics containerMetrics;
+ private final ScheduledExecutorService workerTimer;
+ private final ScheduledExecutorService callbackTimer;
+ private volatile boolean shutdownNow = false;
+ private volatile Throwable throwable = null;
+
+ public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances,
+ ExecutorService threadPool,
+ SystemConsumers consumerMultiplexer,
+ int maxConcurrency,
+ long windowMs,
+ long commitMs,
+ long callbackTimeoutMs,
+ SamzaContainerMetrics containerMetrics) {
+
+ this.threadPool = threadPool;
+ this.consumerMultiplexer = consumerMultiplexer;
+ this.containerMetrics = containerMetrics;
+ this.windowMs = windowMs;
+ this.commitMs = commitMs;
+ this.maxConcurrency = maxConcurrency;
+ this.callbackTimeoutMs = callbackTimeoutMs;
+ this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
+ this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
+ this.latch = new Object();
+ this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+ Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
+ for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+ workers.put(task.taskName(), new AsyncTaskWorker(task));
+ }
+ // Partions and tasks assigned to the container will not change during the run loop life time
+ this.taskWorkers = Collections.unmodifiableMap(workers);
+ this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers));
+ }
+
+ /**
+ * Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to efficiently route the envelopes
+ */
+ private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap(
+ Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) {
+ Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>();
+ for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+ Set<SystemStreamPartition> ssps = JavaConversions.asJavaSet(task.systemStreamPartitions());
+ for (SystemStreamPartition ssp : ssps) {
+ if (sspToWorkerMap.get(ssp) == null) {
+ sspToWorkerMap.put(ssp, new ArrayList<AsyncTaskWorker>());
+ }
+ sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName()));
+ }
+ }
+ return sspToWorkerMap;
+ }
+
+ /**
+ * The run loop chooses messages from the SystemConsumers, and run the ready tasks asynchronously.
+ * Window and commit are run in a thread pool, and they are mutual exclusive with task process.
+ * The loop thread will block if all tasks are busy, and resume if any task finishes.
+ */
+ @Override
+ public void run() {
+ try {
+ for (AsyncTaskWorker taskWorker : taskWorkers.values()) {
+ taskWorker.init();
+ }
+
+ long prevNs = System.nanoTime();
+
+ while (!shutdownNow) {
+ if (throwable != null) {
+ log.error("Caught throwable and stopping run loop", throwable);
+ throw new SamzaException(throwable);
+ }
+
+ long startNs = System.nanoTime();
+
+ IncomingMessageEnvelope envelope = chooseEnvelope();
+
+ long chooseNs = System.nanoTime();
+
+ containerMetrics.chooseNs().update(chooseNs - startNs);
+
+ runTasks(envelope);
+
+ long blockNs = System.nanoTime();
+
+ blockIfBusy(envelope);
+
+ long currentNs = System.nanoTime();
+ long activeNs = blockNs - chooseNs;
+ long totalNs = currentNs - prevNs;
+ prevNs = currentNs;
+ containerMetrics.blockNs().update(currentNs - blockNs);
+ containerMetrics.utilization().set(((double) activeNs) / totalNs);
+ }
+ } finally {
+ workerTimer.shutdown();
+ if (callbackTimer != null) callbackTimer.shutdown();
+ }
+ }
+
+ public void shutdown() {
+ shutdownNow = true;
+ }
+
+ /**
+ * Chooses an envelope from messageChooser without updating it. This enables flow control
+ * on the SSP level, meaning the task will not get further messages for the SSP if it cannot
+ * process it. The chooser is updated only after the callback to process is invoked, then the task
+ * is able to process more messages. This flow control does not block. so in case of empty message chooser,
+ * it will return null immediately without blocking, and the chooser will not poll the underlying system
+ * consumer since there are still messages in the SystemConsumers buffer.
+ */
+ private IncomingMessageEnvelope chooseEnvelope() {
+ IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
+ if (envelope != null) {
+ log.trace("Choose envelope ssp {} offset {} for processing", envelope.getSystemStreamPartition(), envelope.getOffset());
+ containerMetrics.envelopes().inc();
+ } else {
+ log.trace("No envelope is available");
+ containerMetrics.nullEnvelopes().inc();
+ }
+ return envelope;
+ }
+
+ /**
+ * Insert the envelope into the task pending queues and run all the tasks
+ */
+ private void runTasks(IncomingMessageEnvelope envelope) {
+ if (envelope != null) {
+ PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
+ for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
+ worker.state.insertEnvelope(pendingEnvelope);
+ }
+ }
+
+ for (AsyncTaskWorker worker: taskWorkers.values()) {
+ worker.run();
+ }
+ }
+
+ /**
+ * Block the runloop thread if all tasks are busy. Due to limitation of non-blocking for the flow control,
+ * we block the run loop when there are no runnable tasks, or all tasks are idle (no pending messages) while
+ * chooser is empty too. When a task worker finishes or window/commit completes, it will resume the runloop.
+ */
+ private void blockIfBusy(IncomingMessageEnvelope envelope) {
+ synchronized (latch) {
+ while (!shutdownNow && throwable == null) {
+ for (AsyncTaskWorker worker : taskWorkers.values()) {
+ if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps())) {
+ // should continue running since the worker state is ready and there is either new message
+ // or some pending operations for the worker
+ return;
+ }
+ }
+
+ try {
+ log.trace("Block loop thread");
+
+ if (envelope == null) {
+ // If the envelope is null then we will wait for a poll interval, otherwise next choose() will
+ // return null immediately and we will have a busy loop
+ latch.wait(consumerMultiplexer.pollIntervalMs());
+ return;
+ } else {
+ latch.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new SamzaException("Run loop is interrupted", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Resume the runloop thread. It is triggered once a task becomes ready again or has failure.
+ */
+ private void resume() {
+ log.trace("Resume loop thread");
+ if (coordinatorRequests.shouldShutdownNow() && coordinatorRequests.commitRequests().isEmpty()) {
+ shutdownNow = true;
+ }
+ synchronized (latch) {
+ latch.notifyAll();
+ }
+ }
+
+ /**
+ * Set the throwable and abort run loop. The throwable will be thrown from the run loop thread
+ * @param t throwable
+ */
+ private void abort(Throwable t) {
+ throwable = t;
+ }
+
+ /**
+ * PendingEnvenlope contains an envelope that is not processed by this task, and
+ * a flag indicating whether it has been processed by any tasks.
+ */
+ private static final class PendingEnvelope {
+ private final IncomingMessageEnvelope envelope;
+ private boolean processed = false;
+
+ PendingEnvelope(IncomingMessageEnvelope envelope) {
+ this.envelope = envelope;
+ }
+
+ /**
+ * Returns true if the envelope has not been processed.
+ */
+ private boolean markProcessed() {
+ boolean oldValue = processed;
+ processed = true;
+ return !oldValue;
+ }
+ }
+
+
+ private enum WorkerOp {
+ WINDOW,
+ COMMIT,
+ PROCESS,
+ NO_OP
+ }
+
+ /**
+ * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it
+ * will run the task asynchronously. It runs window and commit in the provided thread pool.
+ */
+ private class AsyncTaskWorker implements TaskCallbackListener {
+ private final TaskInstance<AsyncStreamTask> task;
+ private final TaskCallbackManager callbackManager;
+ private volatile AsyncTaskState state;
+
+ AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
+ this.task = task;
+ this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs);
+ this.state = new AsyncTaskState(task.taskName(), task.metrics());
+ }
+
+ private void init() {
+ // schedule the timer for windowing and commiting
+ if (task.isWindowableTask() && windowMs > 0L) {
+ workerTimer.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ log.trace("Task {} need window", task.taskName());
+ state.needWindow();
+ resume();
+ }
+ }, windowMs, windowMs, TimeUnit.MILLISECONDS);
+ }
+
+ if (commitMs > 0L) {
+ workerTimer.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ log.trace("Task {} need commit", task.taskName());
+ state.needCommit();
+ resume();
+ }
+ }, commitMs, commitMs, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Invoke next task operation based on its state
+ */
+ private void run() {
+ switch (state.nextOp()) {
+ case PROCESS:
+ process();
+ break;
+ case WINDOW:
+ window();
+ break;
+ case COMMIT:
+ commit();
+ break;
+ default:
+ //no op
+ break;
+ }
+ }
+
+ /**
+ * Process asynchronously. The callback needs to be fired once the processing is done.
+ */
+ private void process() {
+ final IncomingMessageEnvelope envelope = state.fetchEnvelope();
+ log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+ final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
+ TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
+ @Override
+ public TaskCallback createCallback() {
+ state.startProcess();
+ containerMetrics.processes().inc();
+ return callbackManager.createCallback(task.taskName(), envelope, coordinator);
+ }
+ };
+
+ task.process(envelope, coordinator, callbackFactory);
+ }
+
+ /**
+ * Invoke window. Run window in thread pool if not the single thread mode.
+ */
+ private void window() {
+ state.startWindow();
+ Runnable windowWorker = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ containerMetrics.windows().inc();
+
+ ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
+ long startTime = System.nanoTime();
+ task.window(coordinator);
+ containerMetrics.windowNs().update(System.nanoTime() - startTime);
+ coordinatorRequests.update(coordinator);
+
+ state.doneWindowOrCommit();
+ } catch (Throwable t) {
+ log.error("Task {} window failed", task.taskName(), t);
+ abort(t);
+ } finally {
+ log.trace("Task {} window completed", task.taskName());
+ resume();
+ }
+ }
+ };
+
+ if (threadPool != null) {
+ log.trace("Task {} window on the thread pool", task.taskName());
+ threadPool.submit(windowWorker);
+ } else {
+ log.trace("Task {} window on the run loop thread", task.taskName());
+ windowWorker.run();
+ }
+ }
+
+ /**
+ * Invoke commit. Run commit in thread pool if not the single thread mode.
+ */
+ private void commit() {
+ state.startCommit();
+ Runnable commitWorker = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ containerMetrics.commits().inc();
+
+ long startTime = System.nanoTime();
+ task.commit();
+ containerMetrics.commitNs().update(System.nanoTime() - startTime);
+
+ state.doneWindowOrCommit();
+ } catch (Throwable t) {
+ log.error("Task {} commit failed", task.taskName(), t);
+ abort(t);
+ } finally {
+ log.trace("Task {} commit completed", task.taskName());
+ resume();
+ }
+ }
+ };
+
+ if (threadPool != null) {
+ log.trace("Task {} commits on the thread pool", task.taskName());
+ threadPool.submit(commitWorker);
+ } else {
+ log.trace("Task {} commits on the run loop thread", task.taskName());
+ commitWorker.run();
+ }
+ }
+
+
+
+ /**
+ * Task process completes successfully, update the offsets based on the high-water mark.
+ * Then it will trigger the listener for task state change.
+ * * @param callback AsyncSteamTask.processAsync callback
+ */
+ @Override
+ public void onComplete(TaskCallback callback) {
+ try {
+ state.doneProcess();
+ TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
+ containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs);
+ log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+
+ TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true);
+ if (callbackToUpdate != null) {
+ IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
+ log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+ // update offset
+ task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
+
+ // update coordinator
+ coordinatorRequests.update(callbackToUpdate.coordinator);
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ abort(t);
+ } finally {
+ resume();
+ }
+ }
+
+ /**
+ * Task process fails. Trigger the listener indicating failure.
+ * @param callback AsyncSteamTask.processAsync callback
+ * @param t throwable of the failure
+ */
+ @Override
+ public void onFailure(TaskCallback callback, Throwable t) {
+ try {
+ state.doneProcess();
+ abort(t);
+ // update pending count, but not offset
+ TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
+ callbackManager.updateCallback(callbackImpl, false);
+ log.error("Got callback failure for task {}", callbackImpl.taskName);
+ } catch (Throwable e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ resume();
+ }
+ }
+ }
+
+
+ /**
+ * AsyncTaskState manages the state of the AsyncStreamTask. In summary, a worker has the following states:
+ * ready - ready for window, commit or process next incoming message.
+ * busy - doing window, commit or not able to process next message.
+ * idle - no pending messages, and no window/commit
+ */
+ private final class AsyncTaskState {
+ private volatile boolean needWindow = false;
+ private volatile boolean needCommit = false;
+ private volatile boolean windowOrCommitInFlight = false;
+ private final AtomicInteger messagesInFlight = new AtomicInteger(0);
+ private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue;
+ private final TaskName taskName;
+ private final TaskInstanceMetrics taskMetrics;
+
+ AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics) {
+ this.taskName = taskName;
+ this.taskMetrics = taskMetrics;
+ this.pendingEnvelopQueue = new ArrayDeque<>();
+ }
+
+ /**
+ * Returns whether the task is ready to do process/window/commit.
+ */
+ private boolean isReady() {
+ needCommit |= coordinatorRequests.commitRequests().remove(taskName);
+ if (needWindow || needCommit) {
+ // ready for window or commit only when no messages are in progress and
+ // no window/commit in flight
+ return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
+ } else {
+ // ready for process only when the inflight message count does not exceed threshold
+ // and no window/commit in flight
+ return messagesInFlight.get() < maxConcurrency && !windowOrCommitInFlight;
+ }
+ }
+
+ private boolean hasPendingOps() {
+ return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow;
+ }
+
+ /**
+ * Returns the next operation by this taskWorker
+ */
+ private WorkerOp nextOp() {
+ if (isReady()) {
+ if (needCommit) return WorkerOp.COMMIT;
+ else if (needWindow) return WorkerOp.WINDOW;
+ else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS;
+ }
+ return WorkerOp.NO_OP;
+ }
+
+ private void needWindow() {
+ needWindow = true;
+ }
+
+ private void needCommit() {
+ needCommit = true;
+ }
+
+ private void startWindow() {
+ needWindow = false;
+ windowOrCommitInFlight = true;
+ }
+
+ private void startCommit() {
+ needCommit = false;
+ windowOrCommitInFlight = true;
+ }
+
+ private void startProcess() {
+ messagesInFlight.incrementAndGet();
+ }
+
+ private void doneWindowOrCommit() {
+ windowOrCommitInFlight = false;
+ }
+
+ private void doneProcess() {
+ messagesInFlight.decrementAndGet();
+ }
+
+ /**
+ * Insert an PendingEnvelope into the pending envelope queue.
+ * The function will be called in the run loop thread so no synchronization.
+ * @param pendingEnvelope
+ */
+ private void insertEnvelope(PendingEnvelope pendingEnvelope) {
+ pendingEnvelopQueue.add(pendingEnvelope);
+ int queueSize = pendingEnvelopQueue.size();
+ taskMetrics.pendingMessages().set(queueSize);
+ log.trace("Insert envelope to task {} queue.", taskName);
+ log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize);
+ }
+
+ /**
+ * Fetch the pending envelope in the pending queue for the task to process.
+ * Update the chooser for flow control on the SSP level. Once it's updated, the AsyncRunLoop
+ * will be able to choose new messages from this SSP for the task to process. Note that we
+ * update only when the envelope is first time being processed. This solves the issue in
+ * Broadcast stream where a message need to be processed by multiple tasks. In that case,
+ * the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates
+ * the chooser with the next envelope in the broadcast stream partition.
+ * The function will be called in the run loop thread so no synchronization.
+ * @return
+ */
+ private IncomingMessageEnvelope fetchEnvelope() {
+ PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove();
+ int queueSize = pendingEnvelopQueue.size();
+ taskMetrics.pendingMessages().set(queueSize);
+ log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
+ log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
+
+ if (pendingEnvelope.markProcessed()) {
+ SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
+ consumerMultiplexer.tryUpdate(partition);
+ log.debug("Update chooser for " + partition);
+ }
+ return pendingEnvelope.envelope;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
new file mode 100644
index 0000000..1fc6456
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * AsyncStreamTaskAdapter allows a StreamTask to be executed in parallel.The class
+ * uses the build-in thread pool to invoke StreamTask.process and triggers
+ * the callbacks once it's done. If the thread pool is null, it follows the legacy
+ * synchronous model to execute the tasks on the run loop thread.
+ */
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask {
+ private final StreamTask wrappedTask;
+ private final ExecutorService executor;
+
+ public AsyncStreamTaskAdapter(StreamTask task, ExecutorService executor) {
+ this.wrappedTask = task;
+ this.executor = executor;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) throws Exception {
+ if (wrappedTask instanceof InitableTask) {
+ ((InitableTask) wrappedTask).init(config, context);
+ }
+ }
+
+ @Override
+ public void processAsync(final IncomingMessageEnvelope envelope,
+ final MessageCollector collector,
+ final TaskCoordinator coordinator,
+ final TaskCallback callback) {
+ if (executor != null) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ process(envelope, collector, coordinator, callback);
+ }
+ });
+ } else {
+ // legacy mode: running all tasks in the runloop thread
+ process(envelope, collector, coordinator, callback);
+ }
+ }
+
+ private void process(IncomingMessageEnvelope envelope,
+ MessageCollector collector,
+ TaskCoordinator coordinator,
+ TaskCallback callback) {
+ try {
+ wrappedTask.process(envelope, collector, coordinator);
+ callback.complete();
+ } catch (Throwable t) {
+ callback.failure(t);
+ }
+ }
+
+ @Override
+ public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ if (wrappedTask instanceof WindowableTask) {
+ ((WindowableTask) wrappedTask).window(collector, coordinator);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (wrappedTask instanceof ClosableTask) {
+ ((ClosableTask) wrappedTask).close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
new file mode 100644
index 0000000..052b3b9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * TaskCoordinatorRequests is used in run loop to collect the coordinator
+ * requests from tasks, including commit requests and shutdown requests.
+ * It is thread safe so it can be updated from multiple task threads.
+ */
+public class CoordinatorRequests {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class);
+
+ private final Set<TaskName> taskNames;
+ private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+ private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+ volatile private boolean shutdownNow = false;
+
+ public CoordinatorRequests(Set<TaskName> taskNames) {
+ this.taskNames = taskNames;
+ }
+
+ public void update(ReadableCoordinator coordinator) {
+ if (coordinator.commitRequest().isDefined() || coordinator.shutdownRequest().isDefined()) {
+ checkCoordinator(coordinator);
+ }
+ }
+
+ public Set<TaskName> commitRequests() {
+ return taskCommitRequests;
+ }
+
+ public boolean shouldShutdownNow() {
+ return shutdownNow;
+ }
+
+ /**
+ * A new TaskCoordinator object is passed to a task on every call to StreamTask.process
+ * and WindowableTask.window. This method checks whether the task requested that we
+ * do something that affects the run loop (such as commit or shut down), and updates
+ * run loop state accordingly.
+ */
+ private void checkCoordinator(ReadableCoordinator coordinator) {
+ if (coordinator.requestedCommitTask()) {
+ log.info("Task " + coordinator.taskName() + " requested commit for current task only");
+ taskCommitRequests.add(coordinator.taskName());
+ }
+
+ if (coordinator.requestedCommitAll()) {
+ log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container");
+ taskCommitRequests.addAll(taskNames);
+ }
+
+ if (coordinator.requestedShutdownOnConsensus()) {
+ taskShutdownRequests.add(coordinator.taskName());
+ log.info("Shutdown has now been requested by tasks " + taskShutdownRequests);
+ }
+
+ if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) {
+ log.info("Shutdown requested.");
+ shutdownNow = true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
new file mode 100644
index 0000000..7dddb67
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * TaskCallbackFactory creates the {@link TaskCallback} for {@link org.apache.samza.container.TaskInstance}
+ * to process asynchronously
+ */
+public interface TaskCallbackFactory {
+ TaskCallback createCallback();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
new file mode 100644
index 0000000..9b70099
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link TaskCallback}. It triggers the
+ * {@link TaskCallbackListener} with the callback result. If the
+ * callback is called multiple times, it will throw IllegalStateException
+ * to the listener.
+ */
+class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
+ private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class);
+
+ final TaskName taskName;
+ final IncomingMessageEnvelope envelope;
+ final ReadableCoordinator coordinator;
+ final long timeCreatedNs;
+ private final AtomicBoolean isComplete = new AtomicBoolean(false);
+ private final TaskCallbackListener listener;
+ private ScheduledFuture scheduledFuture = null;
+ private final long seqNum;
+
+ public TaskCallbackImpl(TaskCallbackListener listener,
+ TaskName taskName,
+ IncomingMessageEnvelope envelope,
+ ReadableCoordinator coordinator,
+ long seqNum) {
+ this.listener = listener;
+ this.taskName = taskName;
+ this.envelope = envelope;
+ this.coordinator = coordinator;
+ this.seqNum = seqNum;
+ this.timeCreatedNs = System.nanoTime();
+ }
+
+ @Override
+ public void complete() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ log.trace("Callback complete for ssp {} offset {}.", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+ if (isComplete.compareAndSet(false, true)) {
+ listener.onComplete(this);
+ } else {
+ Throwable throwable = new IllegalStateException("TaskCallback complete has been invoked after completion");
+ log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable);
+ listener.onFailure(this, throwable);
+ }
+ }
+
+ @Override
+ public void failure(Throwable t) {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ log.error("Callback fails for task {} envelope {}.", new Object[] {taskName, envelope}, t);
+
+ if (isComplete.compareAndSet(false, true)) {
+ listener.onFailure(this, t);
+ } else {
+ Throwable throwable = new IllegalStateException("TaskCallback failure has been invoked after completion", t);
+ log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable);
+ listener.onFailure(this, throwable);
+ }
+ }
+
+ void setScheduledFuture(ScheduledFuture scheduledFuture) {
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public int compareTo(TaskCallbackImpl callback) {
+ return Long.compare(this.seqNum, callback.seqNum);
+ }
+
+ boolean matchSeqNum(long seqNum) {
+ return this.seqNum == seqNum;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
new file mode 100644
index 0000000..de4ee58
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * The interface of the listener to the {@link AsyncStreamTask}.processAsync
+ * callback events. If the callback completes with success, onComplete() will be fired.
+ * If the callback fails, onFailure() will be fired.
+ */
+interface TaskCallbackListener {
+ void onComplete(TaskCallback callback);
+ void onFailure(TaskCallback callback, Throwable t);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
new file mode 100644
index 0000000..132cf59
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * TaskCallbackManager manages the life cycle of {@link AsyncStreamTask} callbacks,
+ * including creation, update and status. Internally it maintains a PriorityQueue
+ * for the callbacks based on the sequence number, and updates the offsets for checkpointing
+ * by always moving forward to the latest contiguous callback (uses the high watermark).
+ */
+class TaskCallbackManager {
+
+ private static final class TaskCallbacks {
+ private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
+ private final Object lock = new Object();
+ private long nextSeqNum = 0L;
+
+ /**
+ * Adding the newly complete callback to the callback queue
+ * Move the queue to the last contiguous callback to commit offset
+ * @param cb new callback completed
+ * @return callback of highest watermark needed to be committed
+ */
+ TaskCallbackImpl update(TaskCallbackImpl cb) {
+ synchronized (lock) {
+ callbacks.add(cb);
+
+ TaskCallbackImpl callback = null;
+ TaskCallbackImpl callbackToCommit = null;
+ TaskCoordinator.RequestScope shutdownRequest = null;
+ // look for the last contiguous callback
+ while (!callbacks.isEmpty() && callbacks.peek().matchSeqNum(nextSeqNum)) {
+ ++nextSeqNum;
+ callback = callbacks.poll();
+
+ if (callback.coordinator.commitRequest().isDefined()) {
+ callbackToCommit = callback;
+ }
+
+ if (callback.coordinator.shutdownRequest().isDefined()) {
+ shutdownRequest = callback.coordinator.shutdownRequest().get();
+ }
+ }
+
+ // if there is no manual commit, use the highest contiguous callback message offset
+ if (callbackToCommit == null) {
+ callbackToCommit = callback;
+ }
+
+ // if there is a shutdown request, merge it into the coordinator to commit
+ if (shutdownRequest != null) {
+ callbackToCommit.coordinator.shutdown(shutdownRequest);
+ }
+
+ return callbackToCommit;
+ }
+ }
+ }
+
+ private long seqNum = 0L;
+ private final AtomicInteger pendingCount = new AtomicInteger(0);
+ private final TaskCallbacks completeCallbacks = new TaskCallbacks();
+ private final TaskInstanceMetrics metrics;
+ private final ScheduledExecutorService timer;
+ private final TaskCallbackListener listener;
+ private long timeout;
+
+ public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) {
+ this.listener = listener;
+ this.metrics = metrics;
+ this.timer = timer;
+ this.timeout = timeout;
+ }
+
+ public TaskCallbackImpl createCallback(TaskName taskName,
+ IncomingMessageEnvelope envelope,
+ ReadableCoordinator coordinator) {
+ final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++);
+ int count = pendingCount.incrementAndGet();
+ metrics.messagesInFlight().set(count);
+
+ if (timer != null) {
+ Runnable timerTask = new Runnable() {
+ @Override
+ public void run() {
+ String msg = "Task " + callback.taskName + " callback times out";
+ callback.failure(new TaskCallbackTimeoutException(msg));
+ }
+ };
+ ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+ callback.setScheduledFuture(scheduledFuture);
+ }
+
+ return callback;
+ }
+
+ /**
+ * Update the task callbacks with the new callback completed.
+ * It uses a high-watermark model to roll the callbacks for checkpointing.
+ * @param callback new completed callback
+ * @param success callback result status
+ * @return the callback for checkpointing
+ */
+ public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) {
+ TaskCallbackImpl callbackToCommit = null;
+ if (success) {
+ callbackToCommit = completeCallbacks.update(callback);
+ }
+ int count = pendingCount.decrementAndGet();
+ metrics.messagesInFlight().set(count);
+ return callbackToCommit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
new file mode 100644
index 0000000..bf7f13c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Specific {@link SamzaException}s thrown when a task callback times out
+ */
+public class TaskCallbackTimeoutException extends SamzaException {
+ private static final long serialVersionUID = -2342134146355610665L;
+
+ public TaskCallbackTimeoutException(Throwable e) {
+ super(e);
+ }
+
+ public TaskCallbackTimeoutException(String msg) {
+ super(msg);
+ }
+
+ public TaskCallbackTimeoutException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/util/Utils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java
new file mode 100644
index 0000000..472e0a5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/Utils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+
+
+public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+ private Utils() {}
+
+ /**
+ * Returns a default value object for scala option.getOrDefault() to use
+ * @param value default value
+ * @param <T> value type
+ * @return object containing default value
+ */
+ public static <T> AbstractFunction0<T> defaultValue(final T value) {
+ return new AbstractFunction0<T>() {
+ @Override
+ public T apply() {
+ return value;
+ }
+ };
+ }
+
+ /**
+ * Creates a nanosecond clock using default system nanotime
+ * @return object invokes the system clock
+ */
+ public static AbstractFunction0<Object> defaultClock() {
+ return new AbstractFunction0<Object>() {
+ @Override
+ public Object apply() {
+ return System.nanoTime();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 00648e4..7245902 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -19,6 +19,9 @@
package org.apache.samza.checkpoint
+
+import java.util.concurrent.ConcurrentHashMap
+
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.SystemStreamMetadata
@@ -146,7 +149,7 @@ class OffsetManager(
/**
* Last offsets processed for each SystemStreamPartition.
*/
- var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
+ val lastProcessedOffsets = new ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]()
/**
* Offsets to start reading from for each SystemStreamPartition. This
@@ -182,20 +185,15 @@ class OffsetManager(
* Set the last processed offset for a given SystemStreamPartition.
*/
def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
- lastProcessedOffsets.get(taskName) match {
- case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset))
- case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset))
- }
+ lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
+ lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
}
/**
* Get the last processed offset for a SystemStreamPartition.
*/
- def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
- lastProcessedOffsets.get(taskName) match {
- case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
- case None => None
- }
+ def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition): Option[String] = {
+ Option(lastProcessedOffsets.get(taskName)).map(_.get(systemStreamPartition))
}
/**
@@ -217,7 +215,7 @@ class OffsetManager(
debug("Checkpointing offsets for taskName %s." format taskName)
val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet
- val partitionOffsets = lastProcessedOffsets.get(taskName) match {
+ val partitionOffsets = Option(lastProcessedOffsets.get(taskName)) match {
case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_))
case None => {
warn(taskName + " is not found... ")
@@ -225,8 +223,9 @@ class OffsetManager(
}
}
+ partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2))
checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
- lastProcessedOffsets.get(taskName) match {
+ Option(lastProcessedOffsets.get(taskName)) match {
case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
case None =>
}
@@ -270,9 +269,8 @@ class OffsetManager(
.keys
.flatMap(restoreOffsetsFromCheckpoint(_))
.toMap
- lastProcessedOffsets ++= result.map {
- case (taskName, sspToOffset) => {
- taskName -> sspToOffset.filter {
+ result.map { case (taskName, sspToOffset) => {
+ lastProcessedOffsets.put(taskName, new ConcurrentHashMap[SystemStreamPartition, String](sspToOffset.filter {
case (systemStreamPartition, offset) =>
val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
if (!shouldKeep) {
@@ -280,7 +278,7 @@ class OffsetManager(
}
info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition))
shouldKeep
- }
+ }))
}
}
} else {
@@ -324,17 +322,15 @@ class OffsetManager(
}
}
- lastProcessedOffsets = lastProcessedOffsets.map {
- case (taskName, sspToOffsets) => {
- taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName))
- }
+ lastProcessedOffsets.keys().foreach { taskName =>
+ lastProcessedOffsets.get(taskName).keySet().removeAll(systemStreamPartitionsToReset(taskName))
}
}
/**
* Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
*/
- private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
+ private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
taskNameTosystemStreamPartitions.map {
case (taskName, sspToOffsets) => {
taskName -> (sspToOffsets.filter {
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 49b08f6..13b72fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -44,6 +44,8 @@ object JobConfig {
val SAMZA_FWK_VERSION = "samza.fwk.version"
val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
val JOB_CONTAINER_COUNT = "job.container.count"
+ val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
+ val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
@@ -167,4 +169,13 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getSSPMatcherConfigJobFactoryRegex = getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX)
+ def getThreadPoolSize = getOption(JobConfig.jOB_CONTAINER_THREAD_POOL_SIZE) match {
+ case Some(size) => size.toInt
+ case _ => 0
+ }
+
+ def getSingleThreadMode = getOption(JobConfig.JOB_CONTAINER_SINGLE_THREAD_MODE) match {
+ case Some(mode) => mode.toBoolean
+ case _ => false
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 08a4deb..90c1904 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -38,6 +38,8 @@ object TaskConfig {
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
+ val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
+ val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback
/**
* Samza's container polls for more messages under two conditions. The first
@@ -117,4 +119,13 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
}
+ def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) match {
+ case Some(count) => Some(count.toInt)
+ case _ => None
+ }
+
+ def getCallbackTimeoutMs: Option[Long] = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match {
+ case Some(ms) => Some(ms.toLong)
+ case _ => None
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index cf05c15..bb2c376 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -21,12 +21,18 @@ package org.apache.samza.container
import java.util.concurrent.Executor
-import org.apache.samza.system.{SystemConsumers, SystemStreamPartition}
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.CoordinatorRequests
import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.task.StreamTask
+import org.apache.samza.util.Logging
+import org.apache.samza.util.TimerUtils
+
+import scala.collection.JavaConversions._
/**
- * Each {@link SamzaContainer} uses a single-threaded execution model: activities for
+ * The run loop uses a single-threaded execution model: activities for
* all {@link TaskInstance}s within a container are multiplexed onto one execution
* thread. Those activities include task callbacks (such as StreamTask.process and
* WindowableTask.window), committing checkpoints, etc.
@@ -34,31 +40,29 @@ import org.apache.samza.util.{Logging, TimerUtils}
* <p>This class manages the execution of that run loop, determining what needs to
* be done when.
*/
-class RunLoop(
- val taskInstances: Map[TaskName, TaskInstance],
+class RunLoop (
+ val taskInstances: Map[TaskName, TaskInstance[StreamTask]],
val consumerMultiplexer: SystemConsumers,
val metrics: SamzaContainerMetrics,
val windowMs: Long = -1,
val commitMs: Long = 60000,
val clock: () => Long = { System.nanoTime },
- val shutdownMs: Long = 5000,
val executor: Executor = new SameThreadExecutor()) extends Runnable with TimerUtils with Logging {
private val metricsMsOffset = 1000000L
private var lastWindowNs = clock()
private var lastCommitNs = clock()
private var activeNs = 0L
- private var taskShutdownRequests: Set[TaskName] = Set()
- private var taskCommitRequests: Set[TaskName] = Set()
@volatile private var shutdownNow = false
+ private val coordinatorRequests: CoordinatorRequests = new CoordinatorRequests(taskInstances.keySet)
// Messages come from the chooser with no connection to the TaskInstance they're bound for.
// Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping
- def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = {
+ def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance[StreamTask]]] = {
// We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly
- def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
+ def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance[StreamTask]) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map {
case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList
@@ -70,8 +74,6 @@ class RunLoop(
* unhandled exception is thrown.
*/
def run {
- addShutdownHook(Thread.currentThread())
-
val runTask = new Runnable() {
override def run(): Unit = {
val loopStartTime = clock()
@@ -89,19 +91,8 @@ class RunLoop(
}
}
- private def addShutdownHook(runLoopThread: Thread) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() = {
- info("Shutting down, will wait up to %s ms" format shutdownMs)
- shutdownNow = true
- runLoopThread.join(shutdownMs)
- if (runLoopThread.isAlive) {
- warn("Did not shut down within %s ms, exiting" format shutdownMs)
- } else {
- info("Shutdown complete")
- }
- }
- })
+ def shutdown = {
+ shutdownNow = true
}
/**
@@ -115,7 +106,7 @@ class RunLoop(
// Exclude choose time from activeNs. Although it includes deserialization time,
// it most closely captures idle time.
val envelope = updateTimer(metrics.chooseNs) {
- consumerMultiplexer.choose
+ consumerMultiplexer.choose()
}
activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
@@ -128,11 +119,11 @@ class RunLoop(
val taskInstances = systemStreamPartitionToTaskInstances(ssp)
taskInstances.foreach {
taskInstance =>
- {
- val coordinator = new ReadableCoordinator(taskInstance.taskName)
- taskInstance.process(envelope, coordinator)
- checkCoordinator(coordinator)
- }
+ {
+ val coordinator = new ReadableCoordinator(taskInstance.taskName)
+ taskInstance.process(envelope, coordinator)
+ coordinatorRequests.update(coordinator)
+ }
}
} else {
trace("No incoming message envelope was available.")
@@ -155,7 +146,7 @@ class RunLoop(
case (taskName, task) =>
val coordinator = new ReadableCoordinator(taskName)
task.window(coordinator)
- checkCoordinator(coordinator)
+ coordinatorRequests.update(coordinator)
}
}
})
@@ -167,47 +158,20 @@ class RunLoop(
private def commit {
activeNs += updateTimerAndGetDuration(metrics.commitNs) ((currentTimeNs: Long) => {
if (commitMs >= 0 && lastCommitNs + commitMs * metricsMsOffset < currentTimeNs) {
- trace("Committing task instances because the commit interval has elapsed.")
+ info("Committing task instances because the commit interval has elapsed.")
lastCommitNs = currentTimeNs
metrics.commits.inc
taskInstances.values.foreach(_.commit)
- } else if (!taskCommitRequests.isEmpty) {
+ } else if (!coordinatorRequests.commitRequests.isEmpty){
trace("Committing due to explicit commit request.")
metrics.commits.inc
- taskCommitRequests.foreach(taskName => {
+ coordinatorRequests.commitRequests.foreach(taskName => {
taskInstances(taskName).commit
})
}
- taskCommitRequests = Set()
+ shutdownNow |= coordinatorRequests.shouldShutdownNow
+ coordinatorRequests.commitRequests.clear()
})
}
-
- /**
- * A new TaskCoordinator object is passed to a task on every call to StreamTask.process
- * and WindowableTask.window. This method checks whether the task requested that we
- * do something that affects the run loop (such as commit or shut down), and updates
- * run loop state accordingly.
- */
- private def checkCoordinator(coordinator: ReadableCoordinator) {
- if (coordinator.requestedCommitTask) {
- debug("Task %s requested commit for current task only" format coordinator.taskName)
- taskCommitRequests += coordinator.taskName
- }
-
- if (coordinator.requestedCommitAll) {
- debug("Task %s requested commit for all tasks in the container" format coordinator.taskName)
- taskCommitRequests ++= taskInstances.keys
- }
-
- if (coordinator.requestedShutdownOnConsensus) {
- taskShutdownRequests += coordinator.taskName
- info("Shutdown has now been requested by tasks: %s" format taskShutdownRequests)
- }
-
- if (coordinator.requestedShutdownNow || taskShutdownRequests.size == taskInstances.size) {
- info("Shutdown requested.")
- shutdownNow = true
- }
- }
}