You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2016/11/11 18:20:53 UTC

samza git commit: SAMZA-1043: Samza performance improvements

Repository: samza
Updated Branches:
  refs/heads/master 4660d4ddd -> 715b67d08


SAMZA-1043: Samza performance improvements


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/715b67d0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/715b67d0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/715b67d0

Branch: refs/heads/master
Commit: 715b67d08165418acdfa4ade3cd5f97a0b62e098
Parents: 4660d4d
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Wed Nov 9 11:09:32 2016 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Nov 9 11:09:32 2016 -0800

----------------------------------------------------------------------
 .../apache/samza/container/RunLoopFactory.java  | 28 ++++--
 .../org/apache/samza/task/AsyncRunLoop.java     | 89 +++++++++++---------
 .../apache/samza/task/CoordinatorRequests.java  | 16 ++--
 .../org/apache/samza/task/TaskCallbackImpl.java |  7 +-
 .../apache/samza/task/TaskCallbackManager.java  | 41 +++++----
 .../apache/samza/util/HighResolutionClock.java  | 12 +--
 .../samza/util/SystemHighResolutionClock.java   | 16 +---
 .../apache/samza/util/ThrottlingExecutor.java   | 21 ++++-
 .../main/java/org/apache/samza/util/Utils.java  | 59 -------------
 .../org/apache/samza/config/MetricsConfig.scala | 14 ++-
 .../apache/samza/container/SamzaContainer.scala | 17 +++-
 .../apache/samza/system/SystemConsumers.scala   | 28 ++----
 .../main/scala/org/apache/samza/util/Util.scala |  7 ++
 .../org/apache/samza/task/TestAsyncRunLoop.java |  8 +-
 .../samza/task/TestAsyncStreamAdapter.java      |  6 +-
 .../apache/samza/task/TestTaskCallbackImpl.java |  2 +-
 .../samza/task/TestTaskCallbackManager.java     | 35 ++++----
 .../samza/util/TestThrottlingExecutor.java      | 32 +++----
 .../kv/BaseKeyValueStorageEngineFactory.scala   | 15 +++-
 19 files changed, 223 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 609a956..1c66c82 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -22,6 +22,7 @@ package org.apache.samza.container;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.task.AsyncRunLoop;
 import org.apache.samza.task.AsyncStreamTask;
@@ -29,10 +30,10 @@ import org.apache.samza.task.StreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction0;
 import scala.runtime.AbstractFunction1;
 
-import static org.apache.samza.util.Utils.defaultClock;
-import static org.apache.samza.util.Utils.defaultValue;
+import static org.apache.samza.util.Util.asScalaClock;
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -50,7 +51,8 @@ public class RunLoopFactory {
       ExecutorService threadPool,
       long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics,
-      TaskConfig config) {
+      TaskConfig config,
+      HighResolutionClock clock) {
 
     long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
 
@@ -83,7 +85,7 @@ public class RunLoopFactory {
         maxThrottlingDelayMs,
         taskWindowMs,
         taskCommitMs,
-        defaultClock());
+        asScalaClock(() -> System.nanoTime()));
     } else {
       Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
 
@@ -106,7 +108,23 @@ public class RunLoopFactory {
         taskCommitMs,
         callbackTimeout,
         maxThrottlingDelayMs,
-        containerMetrics);
+        containerMetrics,
+        clock);
     }
   }
+
+  /**
+   * Returns a default value object for scala option.getOrDefault() to use
+   * @param value default value
+   * @param <T> value type
+   * @return object containing default value
+   */
+  public static <T> AbstractFunction0<T> defaultValue(final T value) {
+    return new AbstractFunction0<T>() {
+      @Override
+      public T apply() {
+        return value;
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 8fac815..ba1e1d9 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -39,6 +39,7 @@ import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
@@ -55,7 +56,7 @@ import scala.collection.JavaConversions;
 public class AsyncRunLoop implements Runnable, Throttleable {
   private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class);
 
-  private final Map<TaskName, AsyncTaskWorker> taskWorkers;
+  private final List<AsyncTaskWorker> taskWorkers;
   private final SystemConsumers consumerMultiplexer;
   private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
 
@@ -72,6 +73,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private final ThrottlingScheduler callbackExecutor;
   private volatile boolean shutdownNow = false;
   private volatile Throwable throwable = null;
+  private final HighResolutionClock clock;
 
   public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances,
       ExecutorService threadPool,
@@ -81,7 +83,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       long commitMs,
       long callbackTimeoutMs,
       long maxThrottlingDelayMs,
-      SamzaContainerMetrics containerMetrics) {
+      SamzaContainerMetrics containerMetrics,
+      HighResolutionClock clock) {
 
     this.threadPool = threadPool;
     this.consumerMultiplexer = consumerMultiplexer;
@@ -95,13 +98,14 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
     this.latch = new Object();
     this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+    this.clock = clock;
     Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
     for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) {
       workers.put(task.taskName(), new AsyncTaskWorker(task));
     }
     // Partions and tasks assigned to the container will not change during the run loop life time
-    this.taskWorkers = Collections.unmodifiableMap(workers);
-    this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, taskWorkers));
+    this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, workers));
+    this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values()));
   }
 
   /**
@@ -130,11 +134,11 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   @Override
   public void run() {
     try {
-      for (AsyncTaskWorker taskWorker : taskWorkers.values()) {
+      for (AsyncTaskWorker taskWorker : taskWorkers) {
         taskWorker.init();
       }
 
-      long prevNs = System.nanoTime();
+      long prevNs = clock.nanoTime();
 
       while (!shutdownNow) {
         if (throwable != null) {
@@ -142,25 +146,29 @@ public class AsyncRunLoop implements Runnable, Throttleable {
           throw new SamzaException(throwable);
         }
 
-        long startNs = System.nanoTime();
+        long startNs = clock.nanoTime();
 
         IncomingMessageEnvelope envelope = chooseEnvelope();
-        long chooseNs = System.nanoTime();
+        long chooseNs = clock.nanoTime();
 
         containerMetrics.chooseNs().update(chooseNs - startNs);
 
         runTasks(envelope);
 
-        long blockNs = System.nanoTime();
+        long blockNs = clock.nanoTime();
 
         blockIfBusy(envelope);
 
-        long currentNs = System.nanoTime();
+        long currentNs = clock.nanoTime();
         long activeNs = blockNs - chooseNs;
         long totalNs = currentNs - prevNs;
         prevNs = currentNs;
         containerMetrics.blockNs().update(currentNs - blockNs);
-        containerMetrics.utilization().set(((double) activeNs) / totalNs);
+
+        if (totalNs != 0) {
+          // totalNs is not 0 if timer metrics are enabled
+          containerMetrics.utilization().set(((double) activeNs) / totalNs);
+        }
       }
     } finally {
       workerTimer.shutdown();
@@ -214,7 +222,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       }
     }
 
-    for (AsyncTaskWorker worker: taskWorkers.values()) {
+    for (AsyncTaskWorker worker: taskWorkers) {
       worker.run();
     }
   }
@@ -227,7 +235,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private void blockIfBusy(IncomingMessageEnvelope envelope) {
     synchronized (latch) {
       while (!shutdownNow && throwable == null) {
-        for (AsyncTaskWorker worker : taskWorkers.values()) {
+        for (AsyncTaskWorker worker : taskWorkers) {
           if (worker.state.isReady()) {
             // should continue running if any worker state is ready
             // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop
@@ -310,7 +318,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
 
     AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
       this.task = task;
-      this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs);
+      this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
       Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
       this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet);
     }
@@ -430,9 +438,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             containerMetrics.windows().inc();
 
             ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
-            long startTime = System.nanoTime();
+            long startTime = clock.nanoTime();
             task.window(coordinator);
-            containerMetrics.windowNs().update(System.nanoTime() - startTime);
+            containerMetrics.windowNs().update(clock.nanoTime() - startTime);
             coordinatorRequests.update(coordinator);
 
             state.doneWindowOrCommit();
@@ -466,9 +474,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
           try {
             containerMetrics.commits().inc();
 
-            long startTime = System.nanoTime();
+            long startTime = clock.nanoTime();
             task.commit();
-            containerMetrics.commitNs().update(System.nanoTime() - startTime);
+            containerMetrics.commitNs().update(clock.nanoTime() - startTime);
 
             state.doneWindowOrCommit();
           } catch (Throwable t) {
@@ -497,17 +505,17 @@ public class AsyncRunLoop implements Runnable, Throttleable {
      */
     @Override
     public void onComplete(final TaskCallback callback) {
-      long workNanos = System.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
+      long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
       callbackExecutor.schedule(new Runnable() {
         @Override
         public void run() {
           try {
             state.doneProcess();
             TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-            containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs);
+            containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
             log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
 
-            TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true);
+            TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl);
             if (callbackToUpdate != null) {
               IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
               log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
@@ -540,7 +548,6 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         abort(t);
         // update pending count, but not offset
         TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-        callbackManager.updateCallback(callbackImpl, false);
         log.error("Got callback failure for task {}", callbackImpl.taskName);
       } catch (Throwable e) {
         log.error(e.getMessage(), e);
@@ -564,7 +571,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     private volatile boolean endOfStream = false;
     private volatile boolean windowOrCommitInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
-    private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue;
+    private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
     //Set of SSPs that we are currently processing for this task instance
     private final Set<SystemStreamPartition> processingSspSet;
     private final TaskName taskName;
@@ -573,21 +580,20 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, Set<SystemStreamPartition> sspSet) {
       this.taskName = taskName;
       this.taskMetrics = taskMetrics;
-      this.pendingEnvelopQueue = new ArrayDeque<>();
+      this.pendingEnvelopeQueue = new ArrayDeque<>();
       this.processingSspSet = sspSet;
     }
 
 
     private boolean checkEndOfStream() {
-      PendingEnvelope pendingEnvelope = pendingEnvelopQueue.peek();
-
-      if (pendingEnvelope != null) {
+      if (pendingEnvelopeQueue.size() == 1) {
+        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
         IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isEndOfStream()) {
           SystemStreamPartition ssp = envelope.getSystemStreamPartition();
           processingSspSet.remove(ssp);
-          pendingEnvelopQueue.remove();
+          pendingEnvelopeQueue.remove();
         }
       }
       return processingSspSet.isEmpty();
@@ -597,8 +603,13 @@ public class AsyncRunLoop implements Runnable, Throttleable {
      * Returns whether the task is ready to do process/window/commit.
      */
     private boolean isReady() {
-      endOfStream |= checkEndOfStream();
-      needCommit |= coordinatorRequests.commitRequests().remove(taskName);
+      if (checkEndOfStream()) {
+        endOfStream = true;
+      }
+      if (coordinatorRequests.commitRequests().remove(taskName)) {
+        needCommit = true;
+      }
+
       if (needWindow || needCommit || endOfStream) {
         // ready for window or commit only when no messages are in progress and
         // no window/commit in flight
@@ -621,7 +632,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         if (needCommit) return WorkerOp.COMMIT;
         else if (needWindow) return WorkerOp.WINDOW;
         else if (endOfStream) return WorkerOp.END_OF_STREAM;
-        else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS;
+        else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
       }
       return WorkerOp.NO_OP;
     }
@@ -645,7 +656,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     }
 
     private void startProcess() {
-      messagesInFlight.incrementAndGet();
+      int count = messagesInFlight.incrementAndGet();
+      taskMetrics.messagesInFlight().set(count);
     }
 
     private void doneWindowOrCommit() {
@@ -653,7 +665,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     }
 
     private void doneProcess() {
-      messagesInFlight.decrementAndGet();
+      int count = messagesInFlight.decrementAndGet();
+      taskMetrics.messagesInFlight().set(count);
     }
 
     /**
@@ -662,8 +675,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
      * @param pendingEnvelope
      */
     private void insertEnvelope(PendingEnvelope pendingEnvelope) {
-      pendingEnvelopQueue.add(pendingEnvelope);
-      int queueSize = pendingEnvelopQueue.size();
+      pendingEnvelopeQueue.add(pendingEnvelope);
+      int queueSize = pendingEnvelopeQueue.size();
       taskMetrics.pendingMessages().set(queueSize);
       log.trace("Insert envelope to task {} queue.", taskName);
       log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize);
@@ -682,8 +695,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
      * @return
      */
     private IncomingMessageEnvelope fetchEnvelope() {
-      PendingEnvelope pendingEnvelope = pendingEnvelopQueue.remove();
-      int queueSize = pendingEnvelopQueue.size();
+      PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
+      int queueSize = pendingEnvelopeQueue.size();
       taskMetrics.pendingMessages().set(queueSize);
       log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
       log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
@@ -691,7 +704,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       if (pendingEnvelope.markProcessed()) {
         SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
         consumerMultiplexer.tryUpdate(partition);
-        log.debug("Update chooser for " + partition);
+        log.debug("Update chooser for {}", partition);
       }
       return pendingEnvelope.envelope;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
index 052b3b9..0283d67 100644
--- a/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
+++ b/samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java
@@ -19,10 +19,8 @@
 
 package org.apache.samza.task;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
-
+import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.samza.container.TaskName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +35,8 @@ public class CoordinatorRequests {
   private static final Logger log = LoggerFactory.getLogger(CoordinatorRequests.class);
 
   private final Set<TaskName> taskNames;
-  private final Set<TaskName> taskShutdownRequests = Collections.synchronizedSet(new HashSet<TaskName>());
-  private final Set<TaskName> taskCommitRequests = Collections.synchronizedSet(new HashSet<TaskName>());
+  private final Set<TaskName> taskShutdownRequests = new CopyOnWriteArraySet<>();
+  private final Set<TaskName> taskCommitRequests = new CopyOnWriteArraySet<>();
   volatile private boolean shutdownNow = false;
 
   public CoordinatorRequests(Set<TaskName> taskNames) {
@@ -67,18 +65,18 @@ public class CoordinatorRequests {
    */
   private void checkCoordinator(ReadableCoordinator coordinator) {
     if (coordinator.requestedCommitTask()) {
-      log.info("Task "  + coordinator.taskName() + " requested commit for current task only");
+      log.debug("Task {} requested commit for current task only", coordinator.taskName());
       taskCommitRequests.add(coordinator.taskName());
     }
 
     if (coordinator.requestedCommitAll()) {
-      log.info("Task " + coordinator.taskName() + " requested commit for all tasks in the container");
+      log.debug("Task {} requested commit for all tasks in the container", coordinator.taskName());
       taskCommitRequests.addAll(taskNames);
     }
 
     if (coordinator.requestedShutdownOnConsensus()) {
       taskShutdownRequests.add(coordinator.taskName());
-      log.info("Shutdown has now been requested by tasks " + taskShutdownRequests);
+      log.info("Shutdown has now been requested by tasks {}", taskShutdownRequests);
     }
 
     if (coordinator.requestedShutdownNow() || taskShutdownRequests.size() == taskNames.size()) {
@@ -86,4 +84,4 @@ public class CoordinatorRequests {
       shutdownNow = true;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 9b70099..19b9f1c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -48,13 +48,14 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
       TaskName taskName,
       IncomingMessageEnvelope envelope,
       ReadableCoordinator coordinator,
-      long seqNum) {
+      long seqNum,
+      long timeCreatedNs) {
     this.listener = listener;
     this.taskName = taskName;
     this.envelope = envelope;
     this.coordinator = coordinator;
     this.seqNum = seqNum;
-    this.timeCreatedNs = System.nanoTime();
+    this.timeCreatedNs = timeCreatedNs;
   }
 
   @Override
@@ -101,4 +102,4 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
   boolean matchSeqNum(long seqNum) {
     return this.seqNum == seqNum;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 132cf59..5bce778 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -24,10 +24,9 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.util.HighResolutionClock;
 
 
 /**
@@ -86,27 +85,29 @@ class TaskCallbackManager {
   }
 
   private long seqNum = 0L;
-  private final AtomicInteger pendingCount = new AtomicInteger(0);
-  private final TaskCallbacks completeCallbacks = new TaskCallbacks();
-  private final TaskInstanceMetrics metrics;
+  private final TaskCallbacks completedCallbacks = new TaskCallbacks();
   private final ScheduledExecutorService timer;
   private final TaskCallbackListener listener;
-  private long timeout;
-
-  public TaskCallbackManager(TaskCallbackListener listener, TaskInstanceMetrics metrics, ScheduledExecutorService timer, long timeout) {
+  private final long timeout;
+  private final int maxConcurrency;
+  private final HighResolutionClock clock;
+
+  public TaskCallbackManager(TaskCallbackListener listener,
+      ScheduledExecutorService timer,
+      long timeout,
+      int maxConcurrency,
+      HighResolutionClock clock) {
     this.listener = listener;
-    this.metrics = metrics;
     this.timer = timer;
     this.timeout = timeout;
+    this.maxConcurrency = maxConcurrency;
+    this.clock = clock;
   }
 
   public TaskCallbackImpl createCallback(TaskName taskName,
       IncomingMessageEnvelope envelope,
       ReadableCoordinator coordinator) {
-    final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++);
-    int count = pendingCount.incrementAndGet();
-    metrics.messagesInFlight().set(count);
-
+    final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime());
     if (timer != null) {
       Runnable timerTask = new Runnable() {
         @Override
@@ -126,16 +127,14 @@ class TaskCallbackManager {
    * Update the task callbacks with the new callback completed.
    * It uses a high-watermark model to roll the callbacks for checkpointing.
    * @param callback new completed callback
-   * @param success callback result status
    * @return the callback for checkpointing
    */
-  public TaskCallbackImpl updateCallback(TaskCallbackImpl callback, boolean success) {
-    TaskCallbackImpl callbackToCommit = null;
-    if (success) {
-      callbackToCommit = completeCallbacks.update(callback);
+  public TaskCallbackImpl updateCallback(TaskCallbackImpl callback) {
+    if (maxConcurrency > 1) {
+      // Use the completedCallbacks queue to handle the out-of-order case when max concurrency is larger than 1
+      return completedCallbacks.update(callback);
+    } else {
+      return callback;
     }
-    int count = pendingCount.decrementAndGet();
-    metrics.messagesInFlight().set(count);
-    return callbackToCommit;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
index 69ba441..6d40149 100644
--- a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
@@ -25,7 +25,7 @@ package org.apache.samza.util;
  * <p>
  * Instances of this interface must be thread-safe.
  */
-interface HighResolutionClock {
+public interface HighResolutionClock {
   /**
    * Returns a time point that can be used to calculate the difference in nanoseconds with another
    * time point. Resolution of the timer is platform dependent and not guaranteed to actually
@@ -34,14 +34,4 @@ interface HighResolutionClock {
    * @return current time point in nanoseconds
    */
   long nanoTime();
-
-  /**
-   * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep
-   * time can vary significantly based on the JVM implementation and platform. This function returns
-   * the measured error between expected and actual sleep time.
-   *
-   * @param nanos the number of nanoseconds to sleep.
-   * @throws InterruptedException if the current thread is interrupted while blocked in this method.
-   */
-  long sleep(long nanos) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
index 2e65b60..6bfe7c6 100644
--- a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
@@ -19,23 +19,9 @@
 
 package org.apache.samza.util;
 
-import java.util.concurrent.TimeUnit;
-
 class SystemHighResolutionClock implements HighResolutionClock {
   @Override
   public long nanoTime() {
     return System.nanoTime();
   }
-
-  @Override
-  public long sleep(long nanos) throws InterruptedException {
-    if (nanos <= 0) {
-      return nanos;
-    }
-
-    final long start = System.nanoTime();
-    TimeUnit.NANOSECONDS.sleep(nanos);
-
-    return Util.clampAdd(nanos, -(System.nanoTime() - start));
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index d1298fc..eb956f2 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -74,7 +74,7 @@ public class ThrottlingExecutor implements Throttleable, Executor {
           Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
       if (pendingNanos > 0) {
         try {
-          pendingNanos = clock.sleep(pendingNanos);
+          pendingNanos = sleep(pendingNanos);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
@@ -122,4 +122,23 @@ public class ThrottlingExecutor implements Throttleable, Executor {
   void setPendingNanos(long pendingNanos) {
     this.pendingNanos = pendingNanos;
   }
+
+  /**
+   * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep
+   * time can vary significantly based on the JVM implementation and platform. This function returns
+   * the measured error between expected and actual sleep time.
+   *
+   * @param nanos the number of nanoseconds to sleep.
+   * @throws InterruptedException if the current thread is interrupted while blocked in this method.
+   */
+  long sleep(long nanos) throws InterruptedException {
+    if (nanos <= 0) {
+      return nanos;
+    }
+
+    final long start = System.nanoTime();
+    TimeUnit.NANOSECONDS.sleep(nanos);
+
+    return Util.clampAdd(nanos, -(System.nanoTime() - start));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/java/org/apache/samza/util/Utils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/Utils.java b/samza-core/src/main/java/org/apache/samza/util/Utils.java
deleted file mode 100644
index 472e0a5..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/Utils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-
-
-public class Utils {
-  private static final Logger log = LoggerFactory.getLogger(Utils.class);
-
-  private Utils() {}
-
-  /**
-   * Returns a default value object for scala option.getOrDefault() to use
-   * @param value default value
-   * @param <T> value type
-   * @return object containing default value
-   */
-  public static <T> AbstractFunction0<T> defaultValue(final T value) {
-    return new AbstractFunction0<T>() {
-      @Override
-      public T apply() {
-        return value;
-      }
-    };
-  }
-
-  /**
-   * Creates a nanosecond clock using default system nanotime
-   * @return object invokes the system clock
-   */
-  public static AbstractFunction0<Object> defaultClock() {
-    return new AbstractFunction0<Object>() {
-      @Override
-      public Object apply() {
-        return System.nanoTime();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index c3fd8bf..e9b6b76 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -18,7 +18,10 @@
  */
 
 package org.apache.samza.config
-import scala.collection.JavaConversions._
+
+
+import org.apache.samza.util.HighResolutionClock
+
 
 object MetricsConfig {
   // metrics config constants
@@ -26,6 +29,7 @@ object MetricsConfig {
   val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
   val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
   val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
+  val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
 
   implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
 }
@@ -53,4 +57,10 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
       case _ => List[String]()
     }
   }
-}
+
+  /**
+   * Returns the flag to turn on/off the timer metrics.
+   * @return Boolean flag to enable the timer metrics
+   */
+  def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e0468ee..c35da92 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -74,7 +74,9 @@ import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.AsyncStreamTaskAdapter
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.HighResolutionClock
 import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, Util}
+import org.apache.samza.util.Util.asScalaClock
 
 import scala.collection.JavaConversions._
 
@@ -150,6 +152,15 @@ object SamzaContainer extends Logging {
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
     val offsetManagerMetrics = new OffsetManagerMetrics(registry)
+    val clock = if (config.getMetricsTimerEnabled) {
+      new HighResolutionClock {
+        override def nanoTime(): Long = System.nanoTime()
+      }
+    } else {
+      new HighResolutionClock {
+        override def nanoTime(): Long = 0L
+      }
+    }
 
     val inputSystemStreamPartitions = containerModel
       .getTasks
@@ -383,7 +394,8 @@ object SamzaContainer extends Logging {
       serdeManager = serdeManager,
       metrics = systemConsumersMetrics,
       dropDeserializationError = dropDeserializationError,
-      pollIntervalMs = pollIntervalMs)
+      pollIntervalMs = pollIntervalMs,
+      clock = clock)
 
     val producerMultiplexer = new SystemProducers(
       producers = producers,
@@ -559,7 +571,8 @@ object SamzaContainer extends Logging {
       taskThreadPool,
       maxThrottlingDelayMs,
       samzaContainerMetrics,
-      config)
+      config,
+      clock)
 
     val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
     memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index e2aed5b..17d163d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -62,12 +62,12 @@ class SystemConsumers (
   /**
    * The class that handles deserialization of incoming messages.
    */
-  serdeManager: SerdeManager,
+  serdeManager: SerdeManager = new SerdeManager,
 
   /**
    * A helper class to hold all of SystemConsumers' metrics.
    */
-  metrics: SystemConsumersMetrics,
+  metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
 
   /**
    * If MessageChooser returns null when it's polled, SystemConsumers will
@@ -76,14 +76,14 @@ class SystemConsumers (
    * thread will sit in a tight loop polling every SystemConsumer over and
    * over again if no new messages are available.
    */
-  noNewMessagesTimeout: Int,
+  noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
 
   /**
    * This parameter is to define how to deal with deserialization failure. If
    * set to true, the task will skip the messages when deserialization fails.
    * If set to false, the task will throw SamzaException and fail the container.
    */
-  dropDeserializationError: Boolean,
+  dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
 
   /**
    * <p>Defines an upper bound for how long the SystemConsumers will wait
@@ -99,29 +99,13 @@ class SystemConsumers (
    * with no remaining unprocessed messages, the SystemConsumers will poll for
    * it within 50ms of its availability in the stream system.</p>
    */
-  val pollIntervalMs: Int,
+  val pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
 
   /**
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long) extends Logging with TimerUtils {
-
-  def this(chooser: MessageChooser,
-           consumers: Map[String, SystemConsumer],
-           serdeManager: SerdeManager = new SerdeManager,
-           metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
-           noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
-           dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
-           pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) =
-    this(chooser,
-         consumers,
-         serdeManager,
-         metrics,
-         noNewMessagesTimeout,
-         dropDeserializationError,
-         pollIntervalMs,
-         () => System.nanoTime())
+  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtils {
 
   /**
    * A buffer of incoming messages grouped by SystemStreamPartition. These

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index c4836f2..9019d02 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -388,4 +388,11 @@ object Util extends Logging {
 
     sum
   }
+
+  /**
+   * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp.
+   * @param c Java TimeClock
+   * @return Scala clock function
+   */
+  implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 6000ffa..cc3e1b7 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
@@ -47,7 +46,6 @@ import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
-
 import org.junit.Before;
 import org.junit.Test;
 import scala.Option;
@@ -101,7 +99,8 @@ public class TestAsyncRunLoop {
         commitMs,
         callbackTimeoutMs,
         maxThrottlingDelayMs,
-        containerMetrics);
+        containerMetrics,
+        () -> 0L);
   }
 
   TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
@@ -494,7 +493,8 @@ public class TestAsyncRunLoop {
         commitMs,
         callbackTimeoutMs,
         maxThrottlingDelayMs,
-        containerMetrics);
+        containerMetrics,
+        () -> 0L);
 
     runLoop.run();
     callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
index 99e1e18..d0b820a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
@@ -93,7 +93,7 @@ public class TestAsyncStreamAdapter {
   public void testAdapterWithoutThreadPool() throws Exception {
     taskAdaptor = new AsyncStreamTaskAdapter(task, null);
     TestCallbackListener listener = new TestCallbackListener();
-    TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L);
+    TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L);
 
     taskAdaptor.init(null, null);
     assertTrue(task.inited);
@@ -116,10 +116,10 @@ public class TestAsyncStreamAdapter {
   @Test
   public void testAdapterWithThreadPool() throws Exception {
     TestCallbackListener listener1 = new TestCallbackListener();
-    TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L);
+    TaskCallback callback1 = new TaskCallbackImpl(listener1, null, envelope, null, 0L, 0L);
 
     TestCallbackListener listener2 = new TestCallbackListener();
-    TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L);
+    TaskCallback callback2 = new TaskCallbackImpl(listener2, null, envelope, null, 1L, 0L);
 
     ExecutorService executor = Executors.newFixedThreadPool(2);
     taskAdaptor = new AsyncStreamTaskAdapter(task, executor);

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
index f1dbf35..732405b 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java
@@ -61,7 +61,7 @@ public class TestTaskCallbackImpl {
       }
     };
 
-    callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0);
+    callback = new TaskCallbackImpl(listener, null, mock(IncomingMessageEnvelope.class), null, 0L, 0L);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index d7110f3..b2ed316 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -48,8 +48,7 @@ public class TestTaskCallbackManager {
       public void onFailure(TaskCallback callback, Throwable t) {
       }
     };
-    callbackManager = new TaskCallbackManager(listener, metrics, null, -1);
-
+    callbackManager = new TaskCallbackManager(listener, null, -1, 2, () -> System.nanoTime());
   }
 
   @Test
@@ -68,15 +67,15 @@ public class TestTaskCallbackManager {
     ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
-    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0, true);
+    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback0);
     assertTrue(callbackToCommit.matchSeqNum(0));
     assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
     assertEquals("0", callbackToCommit.envelope.getOffset());
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
-    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1);
-    callbackToCommit = callbackManager.updateCallback(callback1, true);
+    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
+    callbackToCommit = callbackManager.updateCallback(callback1);
     assertTrue(callbackToCommit.matchSeqNum(1));
     assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
     assertEquals("1", callbackToCommit.envelope.getOffset());
@@ -90,18 +89,18 @@ public class TestTaskCallbackManager {
 
     // simulate out of order
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
-    TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true);
+    TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator, 2, 0);
+    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
     assertNull(callbackToCommit);
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
-    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1);
-    callbackToCommit = callbackManager.updateCallback(callback1, true);
+    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator, 1, 0);
+    callbackToCommit = callbackManager.updateCallback(callback1);
     assertNull(callbackToCommit);
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
-    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
-    callbackToCommit = callbackManager.updateCallback(callback0, true);
+    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+    callbackToCommit = callbackManager.updateCallback(callback0);
     assertTrue(callbackToCommit.matchSeqNum(2));
     assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
     assertEquals("2", callbackToCommit.envelope.getOffset());
@@ -117,21 +116,21 @@ public class TestTaskCallbackManager {
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp, "2", null, null);
     ReadableCoordinator coordinator2 = new ReadableCoordinator(taskName);
     coordinator2.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
-    TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2);
-    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2, true);
+    TaskCallbackImpl callback2 = new TaskCallbackImpl(listener, taskName, envelope2, coordinator2, 2, 0);
+    TaskCallbackImpl callbackToCommit = callbackManager.updateCallback(callback2);
     assertNull(callbackToCommit);
 
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1", null, null);
     ReadableCoordinator coordinator1 = new ReadableCoordinator(taskName);
     coordinator1.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1);
-    callbackToCommit = callbackManager.updateCallback(callback1, true);
+    TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName, envelope1, coordinator1, 1, 0);
+    callbackToCommit = callbackManager.updateCallback(callback1);
     assertNull(callbackToCommit);
 
     IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp, "0", null, null);
     ReadableCoordinator coordinator = new ReadableCoordinator(taskName);
-    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0);
-    callbackToCommit = callbackManager.updateCallback(callback0, true);
+    TaskCallbackImpl callback0 = new TaskCallbackImpl(listener, taskName, envelope0, coordinator, 0, 0);
+    callbackToCommit = callbackManager.updateCallback(callback0);
     assertTrue(callbackToCommit.matchSeqNum(1));
     assertEquals(ssp, callbackToCommit.envelope.getSystemStreamPartition());
     assertEquals("1", callbackToCommit.envelope.getOffset());

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
index 0276e6b..ca500fb 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+
 public class TestThrottlingExecutor {
   private static final long MAX_NANOS = Long.MAX_VALUE;
 
@@ -44,7 +45,7 @@ public class TestThrottlingExecutor {
   @Before
   public void setUp() {
     clock = Mockito.mock(HighResolutionClock.class);
-    executor = new ThrottlingExecutor(MAX_NANOS, clock);
+    executor = Mockito.spy(new ThrottlingExecutor(MAX_NANOS, clock));
   }
 
   @Test
@@ -85,7 +86,7 @@ public class TestThrottlingExecutor {
     assertEquals(0L, executor.getPendingNanos());
 
     // At 100% work rate sleep should not be called
-    Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+    Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong());
   }
 
   @Test
@@ -95,7 +96,7 @@ public class TestThrottlingExecutor {
     final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
     setWorkTime(workTimeNanos);
     // Sleep time is same as work time at 50% work rate
-    setExpectedAndActualSleepTime(workTimeNanos, workTimeNanos);
+    setActualSleepTime(workTimeNanos);
     executor.execute(NO_OP);
 
     verifySleepTime(workTimeNanos);
@@ -114,7 +115,7 @@ public class TestThrottlingExecutor {
     final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos);
 
     setWorkTime(workTimeNanos);
-    setExpectedAndActualSleepTime(delayTimeNanos, delayTimeNanos);
+    setActualSleepTime(delayTimeNanos);
 
     executor.execute(NO_OP);
 
@@ -131,7 +132,7 @@ public class TestThrottlingExecutor {
     final long actualDelayTimeNanos = TimeUnit.MILLISECONDS.toNanos(6);
 
     setWorkTime(workTimeNanos);
-    setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayTimeNanos);
+    setActualSleepTime(actualDelayTimeNanos);
 
     executor.execute(NO_OP);
 
@@ -148,7 +149,7 @@ public class TestThrottlingExecutor {
     final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4);
 
     setWorkTime(workTimeNanos);
-    setExpectedAndActualSleepTime(expectedDelayNanos, actualDelayNanos);
+    setActualSleepTime(actualDelayNanos);
 
     executor.execute(NO_OP);
 
@@ -167,7 +168,7 @@ public class TestThrottlingExecutor {
 
     // First execution
     setWorkTime(workTimeNanos);
-    setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos1);
+    setActualSleepTime(actualDelayNanos1);
 
     executor.execute(NO_OP);
 
@@ -177,7 +178,7 @@ public class TestThrottlingExecutor {
 
     // Second execution
     setWorkTime(workTimeNanos);
-    setExpectedAndActualSleepTime(workTimeNanos, actualDelayNanos2);
+    setActualSleepTime(actualDelayNanos2);
 
     executor.execute(NO_OP);
 
@@ -190,12 +191,12 @@ public class TestThrottlingExecutor {
     final long maxDelayMillis = 10;
     final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
 
-    executor = new ThrottlingExecutor(maxDelayMillis, clock);
+    executor = Mockito.spy(new ThrottlingExecutor(maxDelayMillis, clock));
     executor.setWorkFactor(0.5);
 
     // Note work time exceeds maxDelayMillis
     setWorkTime(TimeUnit.MILLISECONDS.toNanos(100));
-    setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos);
+    setActualSleepTime(maxDelayNanos);
 
     executor.execute(NO_OP);
 
@@ -221,6 +222,7 @@ public class TestThrottlingExecutor {
     // At a 50% work factor we'd expect work and sleep to match. As they don't, the function will
     // try to increment the pending sleep nanos, which could (but should not) result in overflow.
     setWorkTime(5000);
+    setActualSleepTime(Long.MAX_VALUE);
 
     executor.execute(NO_OP);
 
@@ -241,7 +243,7 @@ public class TestThrottlingExecutor {
     executor.execute(NO_OP);
 
     // Sleep should not be called with negative pending nanos
-    Mockito.verify(clock, Mockito.never()).sleep(Mockito.anyLong());
+    Mockito.verify(executor, Mockito.never()).sleep(Mockito.anyLong());
     assertEquals(-1000 + 500, executor.getPendingNanos());
   }
 
@@ -253,6 +255,7 @@ public class TestThrottlingExecutor {
     assertEquals(-1000, executor.getPendingNanos());
 
     setWorkTime(1250);
+    setActualSleepTime(1250 + startPendingNanos);
 
     executor.execute(NO_OP);
 
@@ -264,12 +267,11 @@ public class TestThrottlingExecutor {
     Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(workTimeNanos);
   }
 
-  private void setExpectedAndActualSleepTime(long expectedDelayTimeNanos, long actualDelayTimeNanos) throws InterruptedException {
-    Mockito.when(clock.sleep(expectedDelayTimeNanos))
-        .thenReturn(expectedDelayTimeNanos - actualDelayTimeNanos);
+  private void setActualSleepTime(long actualDelayTimeNanos) throws InterruptedException {
+    Mockito.when(executor.sleep(Mockito.anyLong())).thenAnswer(invocation -> (long) invocation.getArguments()[0] - actualDelayTimeNanos);
   }
 
   private void verifySleepTime(long expectedDelayTimeNanos) throws InterruptedException {
-    Mockito.verify(clock).sleep(expectedDelayTimeNanos);
+    Mockito.verify(executor).sleep(expectedDelayTimeNanos);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/715b67d0/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index c975893..8ffc817 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -28,6 +28,9 @@ import org.apache.samza.serializers.Serde
 import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory}
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.util.HighResolutionClock
+import org.apache.samza.util.Util.asScalaClock
 
 /**
  * A key value storage engine factory implementation
@@ -132,7 +135,17 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     // create the storage engine and return
     // TODO: Decide if we should use raw bytes when restoring
     val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
-    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize)
+    val clock = if (containerContext.config.getMetricsTimerEnabled) {
+      new HighResolutionClock {
+        override def nanoTime(): Long = System.nanoTime()
+      }
+    } else {
+      new HighResolutionClock {
+        override def nanoTime(): Long = 0L
+      }
+    }
+
+    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock)
   }
 
 }