You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/20 16:56:49 UTC

[3/3] samza git commit: SAMZA-863: Multithreading support in Samza

SAMZA-863: Multithreading support in Samza


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e5f31c57
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e5f31c57
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e5f31c57

Branch: refs/heads/master
Commit: e5f31c57c957e6a38f566d864d0e5acffba0327d
Parents: 9396ee5
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Jul 19 11:53:44 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Jul 19 11:53:44 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  10 +-
 .../org/apache/samza/task/AsyncStreamTask.java  |  60 ++
 .../org/apache/samza/task/TaskCallback.java     |  38 ++
 .../apache/samza/container/RunLoopFactory.java  | 112 ++++
 .../org/apache/samza/task/AsyncRunLoop.java     | 619 +++++++++++++++++++
 .../samza/task/AsyncStreamTaskAdapter.java      |  92 +++
 .../apache/samza/task/CoordinatorRequests.java  |  89 +++
 .../apache/samza/task/TaskCallbackFactory.java  |  28 +
 .../org/apache/samza/task/TaskCallbackImpl.java | 104 ++++
 .../apache/samza/task/TaskCallbackListener.java |  30 +
 .../apache/samza/task/TaskCallbackManager.java  | 141 +++++
 .../task/TaskCallbackTimeoutException.java      |  42 ++
 .../main/java/org/apache/samza/util/Utils.java  |  59 ++
 .../apache/samza/checkpoint/OffsetManager.scala |  38 +-
 .../org/apache/samza/config/JobConfig.scala     |  11 +
 .../org/apache/samza/config/TaskConfig.scala    |  11 +
 .../org/apache/samza/container/RunLoop.scala    |  92 +--
 .../apache/samza/container/SamzaContainer.scala | 195 ++++--
 .../samza/container/SamzaContainerMetrics.scala |   2 +
 .../apache/samza/container/TaskInstance.scala   |  44 +-
 .../samza/container/TaskInstanceMetrics.scala   |   2 +
 .../samza/coordinator/JobCoordinator.scala      |   5 +-
 .../apache/samza/system/SystemConsumers.scala   |  31 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java | 333 ++++++++++
 .../samza/task/TestAsyncStreamAdapter.java      | 141 +++++
 .../samza/task/TestCoordinatorRequests.java     |  93 +++
 .../apache/samza/task/TestTaskCallbackImpl.java | 125 ++++
 .../samza/task/TestTaskCallbackManager.java     | 141 +++++
 .../apache/samza/container/TestRunLoop.scala    |  64 +-
 .../samza/container/TestSamzaContainer.scala    |   6 +-
 .../samza/container/TestTaskInstance.scala      |   6 +-
 .../samza/system/TestSystemConsumers.scala      |  36 +-
 .../samza/system/hdfs/HdfsSystemProducer.scala  |  70 ++-
 .../migration/KafkaCheckpointMigration.scala    |   1 +
 .../system/kafka/KafkaSystemProducer.scala      | 215 ++++---
 .../kafka/TestKafkaSystemProducerJava.java      |   4 +-
 .../system/kafka/TestKafkaSystemProducer.scala  |  17 +-
 .../kv/inmemory/InMemoryKeyValueStore.scala     |   6 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |   3 -
 .../storage/kv/TestRocksDbKeyValueStore.scala   |  59 +-
 .../apache/samza/storage/kv/CachedStore.scala   | 194 +++---
 .../samza/storage/kv/TestCachedStore.scala      |  15 -
 .../samza/storage/kv/TestKeyValueStores.scala   | 191 +++++-
 43 files changed, 3098 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 325c381..c85dc94 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -33,6 +33,7 @@
     <allow pkg="org.apache.commons" />
     <allow class="scala.collection.JavaConversions" />
     <allow class="scala.collection.JavaConverters" />
+    <allow pkg="scala.runtime" />
 
     <subpackage name="config">
         <allow class="org.apache.samza.SamzaException" />
@@ -133,6 +134,10 @@
         <allow pkg="org.apache.samza.container" />
         <allow pkg="org.apache.samza.metrics" />
         <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.checkpoint" />
+        <allow class="org.apache.samza.SamzaException" />
+        <allow class="org.apache.samza.Partition" />
     </subpackage>
 
     <subpackage name="container">
@@ -142,7 +147,10 @@
         <allow pkg="org.apache.samza.util" />
         <allow pkg="junit.framework" />
         <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
-
+        <allow class="org.apache.samza.SamzaException" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.task" />
+        <allow pkg="org.apache.samza.util" />
         <subpackage name="grouper">
             <subpackage name="stream">
                 <allow pkg="org.apache.samza.system" />

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
new file mode 100644
index 0000000..684ba0b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. It\u2019s provided for better
+ * parallelism and resource utilization. This class allows task to make asynchronous calls and fire callbacks upon completion.
+ * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by implementing other interfaces, such as
+ * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The following invariants hold with these mix-ins:
+ *
+ * InitableTask.init - always the first method invoked on an AsyncStreamTask. It happens-before every subsequent
+ * invocation on AsyncStreamTask (for happens-before semantics, see https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html).
+ *
+ * CloseableTask.close - always the last method invoked on an AsyncStreamTask and all other AsyncStreamTask are guaranteed
+ * to happen-before it.
+ *
+ * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.process.max.inflight.messages=1),
+ * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.process.max.inflight.messages&gt;1),
+ * there is no such happens-before constraint and the AsyncStreamTask is required to coordinate any shared state.
+ *
+ * WindowableTask.window - in either above mode, it is called when no invocations to processAsync are pending and no new
+ * processAsync invocations can be scheduled until it completes. Therefore, a guarantee that all previous processAsync invocations
+ * happen before an invocation of WindowableTask.window. An invocation to WindowableTask.window is guaranteed to happen-before
+ * any subsequent processAsync invocations. The Samza engine is responsible for ensuring that window is invoked in a timely manner.
+ *
+ * Similar to WindowableTask.window, commits are guaranteed to happen only when there are no pending processAsync or WindowableTask.window
+ * invocations. All preceding invocations happen-before commit and commit happens-before all subsequent invocations.
+ */
+public interface AsyncStreamTask {
+  /**
+   * Called once for each message that this AsyncStreamTask receives.
+   * @param envelope Contains the received deserialized message and key, and also information regarding the stream and
+   * partition of which the message was received from.
+   * @param collector Contains the means of sending message envelopes to the output stream. The collector must only
+   * be used during the current call to the process method; you should not reuse the collector between invocations
+   * of this method.
+   * @param coordinator Manages execution of tasks.
+   * @param callback Triggers the completion of the process.
+   */
+  void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
new file mode 100644
index 0000000..8ba7a36
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * A TaskCallback is fired by a {@link AsyncStreamTask} to notify when an asynchronous
+ * process has completed. If the callback is fired multiple times, it will throw IllegalStateException.
+ */
+public interface TaskCallback {
+
+  /**
+   * Invoke when the asynchronous process completed with success.
+   */
+  void complete();
+
+  /**
+   * Invoke when the asynchronous process failed with an error.
+   * @param t  error throwable
+   */
+  void failure(Throwable t);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
new file mode 100644
index 0000000..a789d04
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.task.AsyncRunLoop;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.StreamTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction1;
+
+import static org.apache.samza.util.Utils.defaultValue;
+import static org.apache.samza.util.Utils.defaultClock;
+
+/**
+ * Factory class to create runloop for a Samza task, based on the type
+ * of the task
+ */
+public class RunLoopFactory {
+  private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class);
+
+  private static final long DEFAULT_WINDOW_MS = -1L;
+  private static final long DEFAULT_COMMIT_MS = 60000L;
+  private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> taskInstances,
+      SystemConsumers consumerMultiplexer,
+      ExecutorService threadPool,
+      Executor executor,
+      SamzaContainerMetrics containerMetrics,
+      TaskConfig config) {
+
+    long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
+
+    log.info("Got window milliseconds: " + taskWindowMs);
+
+    long taskCommitMs = config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS));
+
+    log.info("Got commit milliseconds: " + taskCommitMs);
+
+    int asyncTaskCount = taskInstances.values().count(new AbstractFunction1<TaskInstance<?>, Object>() {
+      @Override
+      public Boolean apply(TaskInstance<?> t) {
+        return t.isAsyncTask();
+      }
+    });
+
+    // asyncTaskCount should be either 0 or the number of all taskInstances
+    if (asyncTaskCount > 0 && asyncTaskCount < taskInstances.size()) {
+      throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not supported");
+    }
+
+    if (asyncTaskCount == 0) {
+      log.info("Run loop in single thread mode.");
+
+      scala.collection.immutable.Map<TaskName, TaskInstance<StreamTask>> streamTaskInstances = (scala.collection.immutable.Map) taskInstances;
+      return new RunLoop(
+        streamTaskInstances,
+        consumerMultiplexer,
+        containerMetrics,
+        taskWindowMs,
+        taskCommitMs,
+        defaultClock(),
+        executor);
+    } else {
+      Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
+
+      log.info("Got max messages in flight: " + taskMaxConcurrency);
+
+      Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
+
+      log.info("Got callback timeout: " + callbackTimeout);
+
+      scala.collection.immutable.Map<TaskName, TaskInstance<AsyncStreamTask>> asyncStreamTaskInstances = (scala.collection.immutable.Map) taskInstances;
+
+      log.info("Run loop in asynchronous mode.");
+
+      return new AsyncRunLoop(
+        JavaConversions.asJavaMap(asyncStreamTaskInstances),
+        threadPool,
+        consumerMultiplexer,
+        taskMaxConcurrency,
+        taskWindowMs,
+        taskCommitMs,
+        callbackTimeout,
+        containerMetrics);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
new file mode 100644
index 0000000..a510bb0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+
+/**
+ * The AsyncRunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s.
+ */
+public class AsyncRunLoop implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class);
+
+  private final Map<TaskName, AsyncTaskWorker> taskWorkers;
+  private final SystemConsumers consumerMultiplexer;
+  private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
+  private final ExecutorService threadPool;
+  private final CoordinatorRequests coordinatorRequests;
+  private final Object latch;
+  private final int maxConcurrency;
+  private final long windowMs;
+  private final long commitMs;
+  private final long callbackTimeoutMs;
+  private final SamzaContainerMetrics containerMetrics;
+  private final ScheduledExecutorService workerTimer;
+  private final ScheduledExecutorService callbackTimer;
+  private volatile boolean shutdownNow = false;
+  private volatile Throwable throwable = null;
+
+  public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances,
+      ExecutorService threadPool,
+      SystemConsumers consumerMultiplexer,
+      int maxConcurrency,
+      long windowMs,
+      long commitMs,
+      long callbackTimeoutMs,
+      SamzaContainerMetrics containerMetrics) {
+
+    this.threadPool = threadPool;
+    this.consumerMultiplexer = consumerMultiplexer;
+    this.containerMetrics = containerMetrics;
+    this.windowMs = windowMs;
+    this.commitMs = commitMs;
+    this.maxConcurrency = maxConcurrency;
+    this.callbackTimeoutMs = callbackTimeoutMs;
+    this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
+    this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
+    this.latch = new Object();
+    this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+    Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
+    for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+      workers.put(task.taskName(), new AsyncTaskWorker(task));
+    }
+    // Partions and tasks assigned to the container will not change during the run loop life time
+    this.taskWorkers = Collections.unmodifiableMap(workers);
+    this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers));
+  }
+
+  /**
+   * Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to efficiently route the envelopes
+   */
+  private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap(
+      Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) {
+    Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>();
+    for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
+      Set<SystemStreamPartition> ssps = JavaConversions.asJavaSet(task.systemStreamPartitions());
+      for (SystemStreamPartition ssp : ssps) {
+        if (sspToWorkerMap.get(ssp) == null) {
+          sspToWorkerMap.put(ssp, new ArrayList<AsyncTaskWorker>());
+        }
+        sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName()));
+      }
+    }
+    return sspToWorkerMap;
+  }
+
+  /**
+   * The run loop chooses messages from the SystemConsumers, and run the ready tasks asynchronously.
+   * Window and commit are run in a thread pool, and they are mutual exclusive with task process.
+   * The loop thread will block if all tasks are busy, and resume if any task finishes.
+   */
+  @Override
+  public void run() {
+    try {
+      for (AsyncTaskWorker taskWorker : taskWorkers.values()) {
+        taskWorker.init();
+      }
+
+      long prevNs = System.nanoTime();
+
+      while (!shutdownNow) {
+        if (throwable != null) {
+          log.error("Caught throwable and stopping run loop", throwable);
+          throw new SamzaException(throwable);
+        }
+
+        long startNs = System.nanoTime();
+
+        IncomingMessageEnvelope envelope = chooseEnvelope();
+
+        long chooseNs = System.nanoTime();
+
+        containerMetrics.chooseNs().update(chooseNs - startNs);
+
+        runTasks(envelope);
+
+        long blockNs = System.nanoTime();
+
+        blockIfBusy(envelope);
+
+        long currentNs = System.nanoTime();
+        long activeNs = blockNs - chooseNs;
+        long totalNs = currentNs - prevNs;
+        prevNs = currentNs;
+        containerMetrics.blockNs().update(currentNs - blockNs);
+        containerMetrics.utilization().set(((double) activeNs) / totalNs);
+      }
+    } finally {
+      workerTimer.shutdown();
+      if (callbackTimer != null) callbackTimer.shutdown();
+    }
+  }
+
+  public void shutdown() {
+    shutdownNow = true;
+  }
+
+  /**
+   * Chooses an envelope from messageChooser without updating it. This enables flow control
+   * on the SSP level, meaning the task will not get further messages for the SSP if it cannot
+   * process it. The chooser is updated only after the callback to process is invoked, then the task
+   * is able to process more messages. This flow control does not block. so in case of empty message chooser,
+   * it will return null immediately without blocking, and the chooser will not poll the underlying system
+   * consumer since there are still messages in the SystemConsumers buffer.
+   */
+  private IncomingMessageEnvelope chooseEnvelope() {
+    IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
+    if (envelope != null) {
+      log.trace("Choose envelope ssp {} offset {} for processing", envelope.getSystemStreamPartition(), envelope.getOffset());
+      containerMetrics.envelopes().inc();
+    } else {
+      log.trace("No envelope is available");
+      containerMetrics.nullEnvelopes().inc();
+    }
+    return envelope;
+  }
+
+  /**
+   * Insert the envelope into the task pending queues and run all the tasks
+   */
+  private void runTasks(IncomingMessageEnvelope envelope) {
+    if (envelope != null) {
+      PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
+      for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
+        worker.state.insertEnvelope(pendingEnvelope);
+      }
+    }
+
+    for (AsyncTaskWorker worker: taskWorkers.values()) {
+      worker.run();
+    }
+  }
+
+  /**
+   * Block the runloop thread if all tasks are busy. Due to limitation of non-blocking for the flow control,
+   * we block the run loop when there are no runnable tasks, or all tasks are idle (no pending messages) while
+   * chooser is empty too. When a task worker finishes or window/commit completes, it will resume the runloop.
+   */
+  private void blockIfBusy(IncomingMessageEnvelope envelope) {
+    synchronized (latch) {
+      while (!shutdownNow && throwable == null) {
+        for (AsyncTaskWorker worker : taskWorkers.values()) {
+          if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps())) {
+            // should continue running since the worker state is ready and there is either new message
+            // or some pending operations for the worker
+            return;
+          }
+        }
+
+        try {
+          log.trace("Block loop thread");
+
+          if (envelope == null) {
+            // If the envelope is null then we will wait for a poll interval, otherwise next choose() will
+            // return null immediately and we will have a busy loop
+            latch.wait(consumerMultiplexer.pollIntervalMs());
+            return;
+          } else {
+            latch.wait();
+          }
+        } catch (InterruptedException e) {
+          throw new SamzaException("Run loop is interrupted", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Resume the runloop thread. It is triggered once a task becomes ready again or has failure.
+   */
+  private void resume() {
+    log.trace("Resume loop thread");
+    if (coordinatorRequests.shouldShutdownNow() && coordinatorRequests.commitRequests().isEmpty()) {
+      shutdownNow = true;
+    }
+    synchronized (latch) {
+      latch.notifyAll();
+    }
+  }
+
+  /**
+   * Set the throwable and abort run loop. The throwable will be thrown from the run loop thread
+   * @param t throwable
+   */
+  private void abort(Throwable t) {
+    throwable = t;
+  }
+
+  /**
+   * PendingEnvenlope contains an envelope that is not processed by this task, and
+   * a flag indicating whether it has been processed by any tasks.
+   */
+  private static final class PendingEnvelope {
+    private final IncomingMessageEnvelope envelope;
+    private boolean processed = false;
+
+    PendingEnvelope(IncomingMessageEnvelope envelope) {
+      this.envelope = envelope;
+    }
+
+    /**
+     * Returns true if the envelope has not been processed.
+     */
+    private boolean markProcessed() {
+      boolean oldValue = processed;
+      processed = true;
+      return !oldValue;
+    }
+  }
+
+
+  private enum WorkerOp {
+    WINDOW,
+    COMMIT,
+    PROCESS,
+    NO_OP
+  }
+
+  /**
+   * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it
+   * will run the task asynchronously. It runs window and commit in the provided thread pool.
+   */
+  private class AsyncTaskWorker implements TaskCallbackListener {
+    private final TaskInstance<AsyncStreamTask> task;
+    private final TaskCallbackManager callbackManager;
+    private volatile AsyncTaskState state;
+
+    AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
+      this.task = task;
+      this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs);
+      this.state = new AsyncTaskState(task.taskName(), task.metrics());
+    }
+
+    private void init() {
+      // schedule the timer for windowing and commiting
+      if (task.isWindowableTask() && windowMs > 0L) {
+        workerTimer.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            log.trace("Task {} need window", task.taskName());
+            state.needWindow();
+            resume();
+          }
+        }, windowMs, windowMs, TimeUnit.MILLISECONDS);
+      }
+
+      if (commitMs > 0L) {
+        workerTimer.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            log.trace("Task {} need commit", task.taskName());
+            state.needCommit();
+            resume();
+          }
+        }, commitMs, commitMs, TimeUnit.MILLISECONDS);
+      }
+    }
+
+    /**
+     * Invoke next task operation based on its state
+     */
+    private void run() {
+      switch (state.nextOp()) {
+        case PROCESS:
+          process();
+          break;
+        case WINDOW:
+          window();
+          break;
+        case COMMIT:
+          commit();
+          break;
+        default:
+          //no op
+          break;
+      }
+    }
+
+    /**
+     * Process asynchronously. The callback needs to be fired once the processing is done.
+     */
+    private void process() {
+      final IncomingMessageEnvelope envelope = state.fetchEnvelope();
+      log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+      final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
+      TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
+        @Override
+        public TaskCallback createCallback() {
+          state.startProcess();
+          containerMetrics.processes().inc();
+          return callbackManager.createCallback(task.taskName(), envelope, coordinator);
+        }
+      };
+
+      task.process(envelope, coordinator, callbackFactory);
+    }
+
+    /**
+     * Invoke window. Run window in thread pool if not the single thread mode.
+     */
+    private void window() {
+      state.startWindow();
+      Runnable windowWorker = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            containerMetrics.windows().inc();
+
+            ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
+            long startTime = System.nanoTime();
+            task.window(coordinator);
+            containerMetrics.windowNs().update(System.nanoTime() - startTime);
+            coordinatorRequests.update(coordinator);
+
+            state.doneWindowOrCommit();
+          } catch (Throwable t) {
+            log.error("Task {} window failed", task.taskName(), t);
+            abort(t);
+          } finally {
+            log.trace("Task {} window completed", task.taskName());
+            resume();
+          }
+        }
+      };
+
+      if (threadPool != null) {
+        log.trace("Task {} window on the thread pool", task.taskName());
+        threadPool.submit(windowWorker);
+      } else {
+        log.trace("Task {} window on the run loop thread", task.taskName());
+        windowWorker.run();
+      }
+    }
+
+    /**
+     * Invoke commit. Run commit in thread pool if not the single thread mode.
+     */
+    private void commit() {
+      state.startCommit();
+      Runnable commitWorker = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            containerMetrics.commits().inc();
+
+            long startTime = System.nanoTime();
+            task.commit();
+            containerMetrics.commitNs().update(System.nanoTime() - startTime);
+
+            state.doneWindowOrCommit();
+          } catch (Throwable t) {
+            log.error("Task {} commit failed", task.taskName(), t);
+            abort(t);
+          } finally {
+            log.trace("Task {} commit completed", task.taskName());
+            resume();
+          }
+        }
+      };
+
+      if (threadPool != null) {
+        log.trace("Task {} commits on the thread pool", task.taskName());
+        threadPool.submit(commitWorker);
+      } else {
+        log.trace("Task {} commits on the run loop thread", task.taskName());
+        commitWorker.run();
+      }
+    }
+
+
+
+    /**
+     * Task process completes successfully, update the offsets based on the high-water mark.
+     * Then it will trigger the listener for task state change.
+     * * @param callback AsyncSteamTask.processAsync callback
+     */
+    @Override
+    public void onComplete(TaskCallback callback) {
+      try {
+        state.doneProcess();
+        TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
+        containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs);
+        log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+
+        TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true);
+        if (callbackToUpdate != null) {
+          IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
+          log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+          // update offset
+          task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
+
+          // update coordinator
+          coordinatorRequests.update(callbackToUpdate.coordinator);
+        }
+      } catch (Throwable t) {
+        log.error(t.getMessage(), t);
+        abort(t);
+      } finally {
+        resume();
+      }
+    }
+
+    /**
+     * Task process fails. Trigger the listener indicating failure.
+     * @param callback AsyncSteamTask.processAsync callback
+     * @param t throwable of the failure
+     */
+    @Override
+    public void onFailure(TaskCallback callback, Throwable t) {
+      try {
+        state.doneProcess();
+        abort(t);
+        // update pending count, but not offset
+        TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
+        callbackManager.updateCallback(callbackImpl, false);
+        log.error("Got callback failure for task {}", callbackImpl.taskName);
+      } catch (Throwable e) {
+        log.error(e.getMessage(), e);
+      } finally {
+        resume();
+      }
+    }
+  }
+
+
+  /**
+   * AsyncTaskState manages the state of the AsyncStreamTask. In summary, a worker has the following states:
+   * ready - ready for window, commit or process next incoming message.
+   * busy - doing window, commit or not able to process next message.
+   * idle - no pending messages, and no window/commit
+   */
+  private final class AsyncTaskState {
+    private volatile boolean needWindow = false;
+    private volatile boolean needCommit = false;
+    private volatile boolean windowOrCommitInFlight = false;
+    private final AtomicInteger messagesInFlight = new AtomicInteger(0);
+    private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue;
+    private final TaskName taskName;
+    private final TaskInstanceMetrics taskMetrics;
+
+    AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics) {
+      this.taskName = taskName;
+      this.taskMetrics = taskMetrics;
+      this.pendingEnvelopQueue = new ArrayDeque<>();
+    }
+
+    /**
+     * Returns whether the task is ready to do process/window/commit.
+     */
+    private boolean isReady() {
+      needCommit |= coordinatorRequests.commitRequests().remove(taskName);
+      if (needWindow || needCommit) {
+        // ready for window or commit only when no messages are in progress and
+        // no window/commit in flight
+        return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
+      } else {
+        // ready for process only when the inflight message count does not exceed threshold
+        // and no window/commit in flight
+        return messagesInFlight.get() < maxConcurrency && !windowOrCommitInFlight;
+      }
+    }
+
+    private boolean hasPendingOps() {
+      return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow;
+    }
+
+    /**
+     * Returns the next operation by this taskWorker
+     */
+    private WorkerOp nextOp() {
+      if (isReady()) {
+        if (needCommit) return WorkerOp.COMMIT;
+        else if (needWindow) return WorkerOp.WINDOW;
+        else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS;
+      }
+      return WorkerOp.NO_OP;
+    }
+
+    private void needWindow() {
+      needWindow = true;
+    }
+
+    private void needCommit() {
+      needCommit = true;
+    }
+
+    private void startWindow() {
+      needWindow = false;
+      windowOrCommitInFlight = true;
+    }
+
+    private void startCommit() {
+      needCommit = false;
+      windowOrCommitInFlight = true;
+    }
+
+    private void startProcess() {
+      messagesInFlight.incrementAndGet();
+    }
+
+    private void doneWindowOrCommit() {
+      windowOrCommitInFlight = false;
+    }
+
+    private void doneProcess() {
+      messagesInFlight.decrementAndGet();
+    }
+
+    /**
+     * Insert an PendingEnvelope into the pending envelope queue.
+     * The function will be called in the run loop thread so no synchronization.
+     * @param pendingEnvelope
+     */
+    private void insertEnvelope(PendingEnvelope pendingEnvelope) {
+      pendingEnvelopQueue.add(pendingEnvelope);
+      int queueSize = pendingEnvelopQueue.size();
+      taskMetrics.pendingMessages().set(queueSize);
+      log.trace("Insert envelope to task {} queue.", taskName);
+      log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize);
+    }
+
+    /**
+     * Fetch the pending envelope in the pending queue for the task to process.
+     * Update the chooser for flow control on the SSP level. Once it's updated, the AsyncRunLoop
+     * will be able to choose new messages from this SSP for the task to process. Note that we
+     * update only when the envelope is first time being processed. This solves the issue in
+     * Broadcast stream where a message need to be processed by multiple tasks. In that case,
+     * the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates
+     * the chooser with the next envelope in the broadcast stream partition.
+     * The function will be called in the run loop thread so no synchronization.
+     * @return
+     */
+    private IncomingMessageEnvelope fetchEnvelope() {
+      PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove();
+      int queueSize = pendingEnvelopQueue.size();
+      taskMetrics.pendingMessages().set(queueSize);
+      log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
+      log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
+
+      if (pendingEnvelope.markProcessed()) {
+        SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
+        consumerMultiplexer.tryUpdate(partition);
+        log.debug("Update chooser for " + partition);
+      }
+      return pendingEnvelope.envelope;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
new file mode 100644
index 0000000..1fc6456
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * AsyncStreamTaskAdapter allows a StreamTask to be executed in parallel.The class
+ * uses the build-in thread pool to invoke StreamTask.process and triggers
+ * the callbacks once it's done. If the thread pool is null, it follows the legacy
+ * synchronous model to execute the tasks on the run loop thread.
+ */
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask {
+  private final StreamTask wrappedTask;
+  private final ExecutorService executor;
+
+  public AsyncStreamTaskAdapter(StreamTask task, ExecutorService executor) {
+    this.wrappedTask = task;
+    this.executor = executor;
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    if (wrappedTask instanceof InitableTask) {
+      ((InitableTask) wrappedTask).init(config, context);
+    }
+  }
+
+  @Override
+  public void processAsync(final IncomingMessageEnvelope envelope,
+      final MessageCollector collector,
+      final TaskCoordinator coordinator,
+      final TaskCallback callback) {
+    if (executor != null) {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          process(envelope, collector, coordinator, callback);
+        }
+      });
+    } else {
+      // legacy mode: running all tasks in the runloop thread
+      process(envelope, collector, coordinator, callback);
+    }
+  }
+
+  private void process(IncomingMessageEnvelope envelope,
+      MessageCollector collector,
+      TaskCoordinator coordinator,
+      TaskCallback callback) {
+    try {
+      wrappedTask.process(envelope, collector, coordinator);
+      callback.complete();
+    } catch (Throwable t) {
+      callback.failure(t);
+    }
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    if (wrappedTask instanceof WindowableTask) {
+      ((WindowableTask) wrappedTask).window(collector, coordinator);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (wrappedTask instanceof ClosableTask) {
+      ((ClosableTask) wrappedTask).close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
new file mode 100644
index 0000000..052b3b9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.samza.container.TaskName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * TaskCoordinatorRequests is used in run loop to collect the coordinator
+ * requests from tasks, including commit requests and shutdown requests.
+ * It is thread safe so it can be updated from multiple task threads.
+ */
+public class CoordinatorRequests {
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class);
+
+  private final Set<TaskName> taskNames;
+  private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+  private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+  volatile private boolean shutdownNow = false;
+
+  public CoordinatorRequests(Set<TaskName> taskNames) {
+    this.taskNames = taskNames;
+  }
+
+  public void update(ReadableCoordinator coordinator) {
+    if (coordinator.commitRequest().isDefined() || coordinator.shutdownRequest().isDefined()) {
+      checkCoordinator(coordinator);
+    }
+  }
+
+  public Set<TaskName> commitRequests() {
+    return taskCommitRequests;
+  }
+
+  public boolean shouldShutdownNow() {
+    return shutdownNow;
+  }
+
+  /**
+   * A new TaskCoordinator object is passed to a task on every call to StreamTask.process
+   * and WindowableTask.window. This method checks whether the task requested that we
+   * do something that affects the run loop (such as commit or shut down), and updates
+   * run loop state accordingly.
+   */
+  private void checkCoordinator(ReadableCoordinator coordinator) {
+    if (coordinator.requestedCommitTask()) {
+      log.info("Task "  + coordinator.taskName() + " requested commit for current task only");
+      taskCommitRequests.add(coordinator.taskName());
+    }
+
+    if (coordinator.requestedCommitAll()) {
+      log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container");
+      taskCommitRequests.addAll(taskNames);
+    }
+
+    if (coordinator.requestedShutdownOnConsensus()) {
+      taskShutdownRequests.add(coordinator.taskName());
+      log.info("Shutdown has now been requested by tasks " + taskShutdownRequests);
+    }
+
+    if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) {
+      log.info("Shutdown requested.");
+      shutdownNow = true;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
new file mode 100644
index 0000000..7dddb67
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * TaskCallbackFactory creates the {@link TaskCallback} for {@link org.apache.samza.container.TaskInstance}
+ * to process asynchronously
+ */
+public interface TaskCallbackFactory {
+  TaskCallback createCallback();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
new file mode 100644
index 0000000..9b70099
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements {@link TaskCallback}. It triggers the
+ * {@link TaskCallbackListener} with the callback result. If the
+ * callback is called multiple times, it will throw IllegalStateException
+ * to the listener.
+ */
+class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
+  private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class);
+
+  final TaskName taskName;
+  final IncomingMessageEnvelope envelope;
+  final ReadableCoordinator coordinator;
+  final long timeCreatedNs;
+  private final AtomicBoolean isComplete = new AtomicBoolean(false);
+  private final TaskCallbackListener listener;
+  private ScheduledFuture scheduledFuture = null;
+  private final long seqNum;
+
+  public TaskCallbackImpl(TaskCallbackListener listener,
+      TaskName taskName,
+      IncomingMessageEnvelope envelope,
+      ReadableCoordinator coordinator,
+      long seqNum) {
+    this.listener = listener;
+    this.taskName = taskName;
+    this.envelope = envelope;
+    this.coordinator = coordinator;
+    this.seqNum = seqNum;
+    this.timeCreatedNs = System.nanoTime();
+  }
+
+  @Override
+  public void complete() {
+    if (scheduledFuture != null) {
+      scheduledFuture.cancel(true);
+    }
+    log.trace("Callback complete for ssp {} offset {}.", envelope.getSystemStreamPartition(), envelope.getOffset());
+
+    if (isComplete.compareAndSet(false, true)) {
+      listener.onComplete(this);
+    } else {
+      Throwable throwable = new IllegalStateException("TaskCallback complete has been invoked after completion");
+      log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable);
+      listener.onFailure(this, throwable);
+    }
+  }
+
+  @Override
+  public void failure(Throwable t) {
+    if (scheduledFuture != null) {
+      scheduledFuture.cancel(true);
+    }
+    log.error("Callback fails for task {} envelope {}.", new Object[] {taskName, envelope}, t);
+
+    if (isComplete.compareAndSet(false, true)) {
+      listener.onFailure(this, t);
+    } else {
+      Throwable throwable = new IllegalStateException("TaskCallback failure has been invoked after completion", t);
+      log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable);
+      listener.onFailure(this, throwable);
+    }
+  }
+
+  void setScheduledFuture(ScheduledFuture scheduledFuture) {
+    this.scheduledFuture = scheduledFuture;
+  }
+
+  @Override
+  public int compareTo(TaskCallbackImpl callback) {
+    return Long.compare(this.seqNum, callback.seqNum);
+  }
+
+  boolean matchSeqNum(long seqNum) {
+    return this.seqNum == seqNum;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
new file mode 100644
index 0000000..de4ee58
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * The interface of the listener to the {@link AsyncStreamTask}.processAsync
+ * callback events. If the callback completes with success, onComplete() will be fired.
+ * If the callback fails, onFailure() will be fired.
+ */
+interface TaskCallbackListener {
+  void onComplete(TaskCallback callback);
+  void onFailure(TaskCallback callback, Throwable t);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
new file mode 100644
index 0000000..132cf59
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * TaskCallbackManager manages the life cycle of {@link AsyncStreamTask} callbacks,
+ * including creation, update and status. Internally it maintains a PriorityQueue
+ * for the callbacks based on the sequence number, and updates the offsets for checkpointing
+ * by always moving forward to the latest contiguous callback (uses the high watermark).
+ */
+class TaskCallbackManager {
+
+  private static final class TaskCallbacks {
+    private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
+    private final Object lock = new Object();
+    private long nextSeqNum = 0L;
+
+    /**
+     * Adding the newly complete callback to the callback queue
+     * Move the queue to the last contiguous callback to commit offset
+     * @param cb new callback completed
+     * @return callback of highest watermark needed to be committed
+     */
+    TaskCallbackImpl update(TaskCallbackImpl cb) {
+      synchronized (lock) {
+        callbacks.add(cb);
+
+        TaskCallbackImpl callback = null;
+        TaskCallbackImpl callbackToCommit = null;
+        TaskCoordinator.RequestScope shutdownRequest = null;
+        // look for the last contiguous callback
+        while (!callbacks.isEmpty() && callbacks.peek().matchSeqNum(nextSeqNum)) {
+          ++nextSeqNum;
+          callback = callbacks.poll();
+
+          if (callback.coordinator.commitRequest().isDefined()) {
+            callbackToCommit = callback;
+          }
+
+          if (callback.coordinator.shutdownRequest().isDefined()) {
+            shutdownRequest = callback.coordinator.shutdownRequest().get();
+          }
+        }
+
+        // if there is no manual commit, use the highest contiguous callback message offset
+        if (callbackToCommit == null) {
+          callbackToCommit = callback;
+        }
+
+        // if there is a shutdown request, merge it into the coordinator to commit
+        if (shutdownRequest != null) {
+          callbackToCommit.coordinator.shutdown(shutdownRequest);
+        }
+
+        return callbackToCommit;
+      }
+    }
+  }
+
+  private long seqNum = 0L;
+  private final AtomicInteger pendingCount = new AtomicInteger(0);
+  private final TaskCallbacks completeCallbacks = new TaskCallbacks();
+  private final TaskInstanceMetrics metrics;
+  private final ScheduledExecutorService timer;
+  private final TaskCallbackListener listener;
+  private long timeout;
+
+  public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) {
+    this.listener = listener;
+    this.metrics = metrics;
+    this.timer = timer;
+    this.timeout = timeout;
+  }
+
+  public TaskCallbackImpl createCallback(TaskName taskName,
+      IncomingMessageEnvelope envelope,
+      ReadableCoordinator coordinator) {
+    final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++);
+    int count = pendingCount.incrementAndGet();
+    metrics.messagesInFlight().set(count);
+
+    if (timer != null) {
+      Runnable timerTask = new Runnable() {
+        @Override
+        public void run() {
+          String msg = "Task " + callback.taskName + " callback times out";
+          callback.failure(new TaskCallbackTimeoutException(msg));
+        }
+      };
+      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+      callback.setScheduledFuture(scheduledFuture);
+    }
+
+    return callback;
+  }
+
+  /**
+   * Update the task callbacks with the new callback completed.
+   * It uses a high-watermark model to roll the callbacks for checkpointing.
+   * @param callback new completed callback
+   * @param success callback result status
+   * @return the callback for checkpointing
+   */
+  public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) {
+    TaskCallbackImpl callbackToCommit = null;
+    if (success) {
+      callbackToCommit = completeCallbacks.update(callback);
+    }
+    int count = pendingCount.decrementAndGet();
+    metrics.messagesInFlight().set(count);
+    return callbackToCommit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
new file mode 100644
index 0000000..bf7f13c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Specific {@link SamzaException}s thrown when a task callback times out
+ */
+public class TaskCallbackTimeoutException extends SamzaException {
+  private static final long serialVersionUID = -2342134146355610665L;
+
+  public TaskCallbackTimeoutException(Throwable e) {
+    super(e);
+  }
+
+  public TaskCallbackTimeoutException(String msg) {
+    super(msg);
+  }
+
+  public TaskCallbackTimeoutException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/java/org/apache/samza/util/Utils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java
new file mode 100644
index 0000000..472e0a5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/Utils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+
+
+public class Utils {
+  private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+  private Utils() {}
+
+  /**
+   * Returns a default value object for scala option.getOrDefault() to use
+   * @param value default value
+   * @param <T> value type
+   * @return object containing default value
+   */
+  public static <T> AbstractFunction0<T> defaultValue(final T value) {
+    return new AbstractFunction0<T>() {
+      @Override
+      public T apply() {
+        return value;
+      }
+    };
+  }
+
+  /**
+   * Creates a nanosecond clock using default system nanotime
+   * @return object invokes the system clock
+   */
+  public static AbstractFunction0<Object> defaultClock() {
+    return new AbstractFunction0<Object>() {
+      @Override
+      public Object apply() {
+        return System.nanoTime();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 00648e4..7245902 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -19,6 +19,9 @@
 
 package org.apache.samza.checkpoint
 
+
+import java.util.concurrent.ConcurrentHashMap
+
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamMetadata
@@ -146,7 +149,7 @@ class OffsetManager(
   /**
    * Last offsets processed for each SystemStreamPartition.
    */
-  var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
+  val lastProcessedOffsets = new ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]()
 
   /**
    * Offsets to start reading from for each SystemStreamPartition. This
@@ -182,20 +185,15 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
-    lastProcessedOffsets.get(taskName) match {
-      case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset))
-      case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset))
-    }
+    lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
+    lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
   }
 
   /**
    * Get the last processed offset for a SystemStreamPartition.
    */
-  def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
-    lastProcessedOffsets.get(taskName) match {
-      case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
-      case None => None
-    }
+  def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition): Option[String] = {
+    Option(lastProcessedOffsets.get(taskName)).map(_.get(systemStreamPartition))
   }
 
   /**
@@ -217,7 +215,7 @@ class OffsetManager(
       debug("Checkpointing offsets for taskName %s." format taskName)
 
       val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet
-      val partitionOffsets = lastProcessedOffsets.get(taskName) match {
+      val partitionOffsets = Option(lastProcessedOffsets.get(taskName)) match {
         case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_))
         case None => {
           warn(taskName + " is not found... ")
@@ -225,8 +223,9 @@ class OffsetManager(
         }
       }
 
+      partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2))
       checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
-      lastProcessedOffsets.get(taskName) match {
+      Option(lastProcessedOffsets.get(taskName)) match {
         case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
         case None =>
       }
@@ -270,9 +269,8 @@ class OffsetManager(
         .keys
         .flatMap(restoreOffsetsFromCheckpoint(_))
         .toMap
-      lastProcessedOffsets ++= result.map {
-        case (taskName, sspToOffset) => {
-          taskName -> sspToOffset.filter {
+      result.map { case (taskName, sspToOffset) => {
+          lastProcessedOffsets.put(taskName, new ConcurrentHashMap[SystemStreamPartition, String](sspToOffset.filter {
             case (systemStreamPartition, offset) =>
               val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
               if (!shouldKeep) {
@@ -280,7 +278,7 @@ class OffsetManager(
               }
               info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition))
               shouldKeep
-          }
+          }))
         }
       }
     } else {
@@ -324,17 +322,15 @@ class OffsetManager(
       }
     }
 
-    lastProcessedOffsets = lastProcessedOffsets.map {
-      case (taskName, sspToOffsets) => {
-        taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName))
-      }
+    lastProcessedOffsets.keys().foreach { taskName =>
+      lastProcessedOffsets.get(taskName).keySet().removeAll(systemStreamPartitionsToReset(taskName))
     }
   }
 
   /**
    * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
    */
-  private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
+  private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
     taskNameTosystemStreamPartitions.map {
       case (taskName, sspToOffsets) => {
         taskName -> (sspToOffsets.filter {

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 49b08f6..13b72fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -44,6 +44,8 @@ object JobConfig {
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
   val JOB_CONTAINER_COUNT = "job.container.count"
+  val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
+  val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
   val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
   val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
@@ -167,4 +169,13 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSSPMatcherConfigJobFactoryRegex = getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX)
 
+  def getThreadPoolSize = getOption(JobConfig.jOB_CONTAINER_THREAD_POOL_SIZE) match {
+    case Some(size) => size.toInt
+    case _ => 0
+  }
+
+  def getSingleThreadMode = getOption(JobConfig.JOB_CONTAINER_SINGLE_THREAD_MODE) match {
+    case Some(mode) => mode.toBoolean
+    case _ => false
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 08a4deb..90c1904 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -38,6 +38,8 @@ object TaskConfig {
   val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
   val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
   val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
+  val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
+  val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms"  // timeout period for triggering a callback
 
   /**
    * Samza's container polls for more messages under two conditions. The first
@@ -117,4 +119,13 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
+  def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) match {
+    case Some(count) => Some(count.toInt)
+    case _ => None
+  }
+
+  def getCallbackTimeoutMs: Option[Long] = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match {
+    case Some(ms) => Some(ms.toLong)
+    case _ => None
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e5f31c57/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index cf05c15..bb2c376 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -21,12 +21,18 @@ package org.apache.samza.container
 
 import java.util.concurrent.Executor
 
-import org.apache.samza.system.{SystemConsumers, SystemStreamPartition}
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.task.CoordinatorRequests
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.task.StreamTask
+import org.apache.samza.util.Logging
+import org.apache.samza.util.TimerUtils
+
+import scala.collection.JavaConversions._
 
 /**
- * Each {@link SamzaContainer} uses a single-threaded execution model: activities for
+ * The run loop uses a single-threaded execution model: activities for
  * all {@link TaskInstance}s within a container are multiplexed onto one execution
  * thread. Those activities include task callbacks (such as StreamTask.process and
  * WindowableTask.window), committing checkpoints, etc.
@@ -34,31 +40,29 @@ import org.apache.samza.util.{Logging, TimerUtils}
  * <p>This class manages the execution of that run loop, determining what needs to
  * be done when.
  */
-class RunLoop(
-  val taskInstances: Map[TaskName, TaskInstance],
+class RunLoop (
+  val taskInstances: Map[TaskName, TaskInstance[StreamTask]],
   val consumerMultiplexer: SystemConsumers,
   val metrics: SamzaContainerMetrics,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
   val clock: () => Long = { System.nanoTime },
-  val shutdownMs: Long = 5000,
   val executor: Executor = new SameThreadExecutor()) extends Runnable with TimerUtils with Logging {
 
   private val metricsMsOffset = 1000000L
   private var lastWindowNs = clock()
   private var lastCommitNs = clock()
   private var activeNs = 0L
-  private var taskShutdownRequests: Set[TaskName] = Set()
-  private var taskCommitRequests: Set[TaskName] = Set()
   @volatile private var shutdownNow = false
+  private val coordinatorRequests: CoordinatorRequests = new CoordinatorRequests(taskInstances.keySet)
 
   // Messages come from the chooser with no connection to the TaskInstance they're bound for.
   // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
   val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping
 
-  def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = {
+  def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance[StreamTask]]] = {
     // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly
-    def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
+    def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance[StreamTask]) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
 
     taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map {
       case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList
@@ -70,8 +74,6 @@ class RunLoop(
    * unhandled exception is thrown.
    */
   def run {
-    addShutdownHook(Thread.currentThread())
-
     val runTask = new Runnable() {
       override def run(): Unit = {
         val loopStartTime = clock()
@@ -89,19 +91,8 @@ class RunLoop(
     }
   }
 
-  private def addShutdownHook(runLoopThread: Thread) {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() = {
-        info("Shutting down, will wait up to %s ms" format shutdownMs)
-        shutdownNow = true
-        runLoopThread.join(shutdownMs)
-        if (runLoopThread.isAlive) {
-          warn("Did not shut down within %s ms, exiting" format shutdownMs)
-        } else {
-          info("Shutdown complete")
-        }
-      }
-    })
+  def shutdown = {
+    shutdownNow = true
   }
 
   /**
@@ -115,7 +106,7 @@ class RunLoop(
     // Exclude choose time from activeNs. Although it includes deserialization time,
     // it most closely captures idle time.
     val envelope = updateTimer(metrics.chooseNs) {
-     consumerMultiplexer.choose
+     consumerMultiplexer.choose()
     }
 
     activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
@@ -128,11 +119,11 @@ class RunLoop(
         val taskInstances = systemStreamPartitionToTaskInstances(ssp)
         taskInstances.foreach {
           taskInstance =>
-            {
-              val coordinator = new ReadableCoordinator(taskInstance.taskName)
-              taskInstance.process(envelope, coordinator)
-              checkCoordinator(coordinator)
-            }
+          {
+            val coordinator = new ReadableCoordinator(taskInstance.taskName)
+            taskInstance.process(envelope, coordinator)
+            coordinatorRequests.update(coordinator)
+          }
         }
       } else {
         trace("No incoming message envelope was available.")
@@ -155,7 +146,7 @@ class RunLoop(
           case (taskName, task) =>
             val coordinator = new ReadableCoordinator(taskName)
             task.window(coordinator)
-            checkCoordinator(coordinator)
+            coordinatorRequests.update(coordinator)
         }
       }
     })
@@ -167,47 +158,20 @@ class RunLoop(
   private def commit {
     activeNs += updateTimerAndGetDuration(metrics.commitNs) ((currentTimeNs: Long) => {
       if (commitMs >= 0 && lastCommitNs + commitMs * metricsMsOffset < currentTimeNs) {
-        trace("Committing task instances because the commit interval has elapsed.")
+        info("Committing task instances because the commit interval has elapsed.")
         lastCommitNs = currentTimeNs
         metrics.commits.inc
         taskInstances.values.foreach(_.commit)
-      } else if (!taskCommitRequests.isEmpty) {
+      } else if (!coordinatorRequests.commitRequests.isEmpty){
         trace("Committing due to explicit commit request.")
         metrics.commits.inc
-        taskCommitRequests.foreach(taskName => {
+        coordinatorRequests.commitRequests.foreach(taskName => {
           taskInstances(taskName).commit
         })
       }
 
-      taskCommitRequests = Set()
+      shutdownNow |= coordinatorRequests.shouldShutdownNow
+      coordinatorRequests.commitRequests.clear()
     })
   }
-
-  /**
-   * A new TaskCoordinator object is passed to a task on every call to StreamTask.process
-   * and WindowableTask.window. This method checks whether the task requested that we
-   * do something that affects the run loop (such as commit or shut down), and updates
-   * run loop state accordingly.
-   */
-  private def checkCoordinator(coordinator: ReadableCoordinator) {
-    if (coordinator.requestedCommitTask) {
-      debug("Task %s requested commit for current task only" format coordinator.taskName)
-      taskCommitRequests += coordinator.taskName
-    }
-
-    if (coordinator.requestedCommitAll) {
-      debug("Task %s requested commit for all tasks in the container" format coordinator.taskName)
-      taskCommitRequests ++= taskInstances.keys
-    }
-
-    if (coordinator.requestedShutdownOnConsensus) {
-      taskShutdownRequests += coordinator.taskName
-      info("Shutdown has now been requested by tasks: %s" format taskShutdownRequests)
-    }
-
-    if (coordinator.requestedShutdownNow || taskShutdownRequests.size == taskInstances.size) {
-      info("Shutdown requested.")
-      shutdownNow = true
-    }
-  }
 }