You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/30 20:53:06 UTC

samza git commit: SAMZA-2015: Refactor timer handling in tables to be consistent with stores

Repository: samza
Updated Branches:
  refs/heads/master bdce47707 -> 210631cd5


SAMZA-2015: Refactor timer handling in tables to be consistent with stores

Currently when timer is disabled, we do not instantiate timer instances for tables, this introduced potential opportunities for NPE in the future. We wanted to refactor to use the same approach used in store implementation based on HighResolutionClock.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #835 from weisong44/SAMZA-2015


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/210631cd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/210631cd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/210631cd

Branch: refs/heads/master
Commit: 210631cd5c4487e99978189ee9b8d0b0c8847aba
Parents: bdce477
Author: Wei Song <ws...@linkedin.com>
Authored: Fri Nov 30 12:52:59 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Fri Nov 30 12:52:59 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/table/BaseReadableTable.java   |  9 +++++
 .../samza/table/caching/CachingTable.java       | 28 +++++++--------
 .../table/remote/RemoteReadWriteTable.java      |  8 ++---
 .../samza/table/remote/RemoteReadableTable.java |  8 ++---
 .../samza/table/utils/TableReadMetrics.java     | 15 ++------
 .../samza/table/utils/TableWriteMetrics.java    | 37 ++++++--------------
 .../samza/table/caching/TestCachingTable.java   | 14 +++-----
 .../samza/storage/kv/LocalReadWriteTable.java   |  4 +--
 .../samza/storage/kv/LocalReadableTable.java    |  4 +--
 .../storage/kv/TestLocalReadWriteTable.java     |  3 --
 .../storage/kv/TestLocalReadableTable.java      |  3 --
 11 files changed, 52 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
index 7eaaa83..1dfd54c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
@@ -19,9 +19,11 @@
 package org.apache.samza.table;
 
 import com.google.common.base.Preconditions;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.table.utils.TableReadMetrics;
 import org.apache.samza.table.utils.TableWriteMetrics;
+import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,8 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
   protected TableReadMetrics readMetrics;
   protected TableWriteMetrics writeMetrics;
 
+  protected HighResolutionClock clock;
+
   /**
    * Construct an instance
    * @param tableId Id of the table
@@ -54,6 +58,11 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
 
   @Override
   public void init(Context context) {
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    clock = metricsConfig.getMetricsTimerEnabled()
+        ? () -> System.nanoTime()
+        : () -> 0L;
+
     readMetrics = new TableReadMetrics(context, this, tableId);
     if (this instanceof ReadWriteTable) {
       writeMetrics = new TableWriteMetrics(context, this, tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index ac6188b..e63bf61 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -130,7 +130,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
       return CompletableFuture.completedFuture(value);
     }
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     missCount.incrementAndGet();
 
     return rdTable.getAsync(key).handle((result, e) -> {
@@ -140,7 +140,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
           if (result != null) {
             cache.put(key, result);
           }
-          updateTimer(readMetrics.getNs, System.nanoTime() - startNs);
+          updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
           return result;
         }
       });
@@ -168,7 +168,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
       return CompletableFuture.completedFuture(getAllResult);
     }
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get records for " + keys, e);
@@ -179,7 +179,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs);
+          updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -201,7 +201,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
     incCounter(writeMetrics.numPuts);
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     return rwTable.putAsync(key, value).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e);
@@ -212,7 +212,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
             cache.put(key, value);
           }
         }
-        updateTimer(writeMetrics.putNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -231,7 +231,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
     incCounter(writeMetrics.numPutAlls);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
     return rwTable.putAllAsync(records).handle((result, e) -> {
         if (e != null) {
@@ -240,7 +240,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
           cache.putAll(records);
         }
 
-        updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -259,7 +259,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
     incCounter(writeMetrics.numDeletes);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     return rwTable.deleteAsync(key).handle((result, e) -> {
         if (e != null) {
@@ -267,7 +267,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -284,7 +284,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
     incCounter(writeMetrics.numDeleteAlls);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     return rwTable.deleteAllAsync(keys).handle((result, e) -> {
         if (e != null) {
@@ -292,7 +292,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -300,10 +300,10 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   @Override
   public synchronized void flush() {
     incCounter(writeMetrics.numFlushes);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
     rwTable.flush();
-    updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
+    updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index b96087b..80c2cac 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -174,9 +174,9 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
   public void flush() {
     try {
       incCounter(writeMetrics.numFlushes);
-      long startNs = System.nanoTime();
+      long startNs = clock.nanoTime();
       writeFn.flush();
-      updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
+      updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = "Failed to flush remote store";
       logger.error(errMsg, e);
@@ -202,7 +202,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
   protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
       K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
@@ -223,7 +223,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
       Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
       Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index e02650e..84a05b8 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -166,7 +166,7 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
       K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
@@ -187,7 +187,7 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
       Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
@@ -207,12 +207,12 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
   protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
     if (callbackExecutor != null) {
       ioFuture.thenApplyAsync(r -> {
-          updateTimer(timer, System.nanoTime() - startNs);
+          updateTimer(timer, clock.nanoTime() - startNs);
           return r;
         }, callbackExecutor);
     } else {
       ioFuture.thenApply(r -> {
-          updateTimer(timer, System.nanoTime() - startNs);
+          updateTimer(timer, clock.nanoTime() - startNs);
           return r;
         });
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
index 0775844..e77fcfd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
@@ -34,7 +33,6 @@ public class TableReadMetrics {
   public final Timer getAllNs;
   public final Counter numGets;
   public final Counter numGetAlls;
-  public final Timer getCallbackNs;
   public final Counter numMissedLookups;
 
   /**
@@ -47,19 +45,10 @@ public class TableReadMetrics {
   public TableReadMetrics(Context context, Table table, String tableId) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
     numGets = tableMetricsUtil.newCounter("num-gets");
+    getNs = tableMetricsUtil.newTimer("get-ns");
     numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
     numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      getNs = tableMetricsUtil.newTimer("get-ns");
-      getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-      getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
-    } else {
-      getNs = null;
-      getAllNs = null;
-      getCallbackNs = null;
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
index 02af35f..bf65b74 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
@@ -27,18 +26,16 @@ import org.apache.samza.table.Table;
 
 public class TableWriteMetrics {
 
-  public final Timer putNs;
-  public final Timer putAllNs;
-  public final Timer deleteNs;
-  public final Timer deleteAllNs;
-  public final Timer flushNs;
   public final Counter numPuts;
+  public final Timer putNs;
   public final Counter numPutAlls;
+  public final Timer putAllNs;
   public final Counter numDeletes;
+  public final Timer deleteNs;
   public final Counter numDeleteAlls;
+  public final Timer deleteAllNs;
   public final Counter numFlushes;
-  public final Timer putCallbackNs;
-  public final Timer deleteCallbackNs;
+  public final Timer flushNs;
 
   /**
    * Utility class that contains the default set of write metrics.
@@ -50,28 +47,14 @@ public class TableWriteMetrics {
   public TableWriteMetrics(Context context, Table table, String tableId) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
     numPuts = tableMetricsUtil.newCounter("num-puts");
+    putNs = tableMetricsUtil.newTimer("put-ns");
     numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
     numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    deleteNs = tableMetricsUtil.newTimer("delete-ns");
     numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
     numFlushes = tableMetricsUtil.newCounter("num-flushes");
-
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      putNs = tableMetricsUtil.newTimer("put-ns");
-      putAllNs = tableMetricsUtil.newTimer("putAll-ns");
-      deleteNs = tableMetricsUtil.newTimer("delete-ns");
-      deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
-      flushNs = tableMetricsUtil.newTimer("flush-ns");
-      putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
-      deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
-    } else {
-      putNs = null;
-      putAllNs = null;
-      deleteNs = null;
-      deleteAllNs = null;
-      flushNs = null;
-      putCallbackNs = null;
-      deleteCallbackNs = null;
-    }
+    flushNs = tableMetricsUtil.newTimer("flush-ns");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index daaba46..5a19767 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -277,14 +277,14 @@ public class TestCachingTable {
   public void testGuavaCacheAndRemoteTable() throws Exception {
     String tableId = "testGuavaCacheAndRemoteTable";
     Cache<String, String> guavaCache = CacheBuilder.newBuilder().initialCapacity(100).build();
-    final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId, guavaCache);
+    final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId + "-cache", guavaCache);
 
     // It is okay to share rateLimitHelper and async helper for read/write in test
     TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
     final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
-        tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
+        tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
@@ -296,11 +296,11 @@ public class TestCachingTable {
     // 5 per read/write table (15)
     verify(metricsRegistry, times(24)).newCounter(any(), anyString());
 
-    // 3 per readable table (9)
-    // 7 per read/write table (21)
+    // 2 per readable table (6)
+    // 5 per read/write table (15)
     // 1 per remote readable table (1)
     // 1 per remote read/write table (1)
-    verify(metricsRegistry, times(32)).newTimer(any(), anyString());
+    verify(metricsRegistry, times(23)).newTimer(any(), anyString());
 
     // 1 per guava table (1)
     // 3 per caching table (2)
@@ -424,10 +424,6 @@ public class TestCachingTable {
     cachingTable.deleteAsync("");
     cachingTable.deleteAll(Collections.emptyList());
     cachingTable.deleteAllAsync(Collections.emptyList());
-
-    verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString());
-    verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any());
-    verify(metricsRegistry, times(0)).newTimer(any(), anyString());
   }
 
   private TableDescriptor createDummyTableDescriptor(String tableId) {

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
index 2704429..eae6bb0 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
@@ -146,9 +146,9 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   private void instrument(Counter counter, Timer timer, Func0 func) {
     incCounter(counter);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     func.apply();
-    updateTimer(timer, System.nanoTime() - startNs);
+    updateTimer(timer, clock.nanoTime() - startNs);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
index ba0d3cf..29ddb15 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
@@ -100,9 +100,9 @@ public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
 
   private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
     incCounter(counter);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     T result = func.get();
-    updateTimer(timer, System.nanoTime() - startNs);
+    updateTimer(timer, clock.nanoTime() - startNs);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
index 70cde27..044fab4 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
@@ -212,9 +212,6 @@ public class TestLocalReadWriteTable {
     table.deleteAll(Collections.emptyList());
     table.deleteAllAsync(Collections.emptyList()).get();
     table.flush();
-    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(1, numFlushes.getCount());
     Assert.assertEquals(2, numPuts.getCount());
     Assert.assertEquals(0, numPutAlls.getCount());

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
index 44802b0..e1c82d9 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -126,9 +126,6 @@ public class TestLocalReadableTable {
     table.getAsync("").get();
     table.getAll(keys);
     table.getAllAsync(keys).get();
-    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(2, numGets.getCount());
     Assert.assertEquals(4, numMissedLookups.getCount());
     Assert.assertEquals(2, numGetAlls.getCount());