You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/27 18:40:43 UTC

[1/2] samza git commit: SAMZA-2004: Add ability to disable table metrics

Repository: samza
Updated Branches:
  refs/heads/master e25e0dab9 -> 5069f1ddb


http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
new file mode 100644
index 0000000..1f2d586
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadableTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalReadableTable {
+
+  public static final String TABLE_ID = "t1";
+
+  private List<String> keys;
+  private Map<String, String> values;
+
+  private Timer getNs;
+  private Timer getAllNs;
+  private Counter numGets;
+  private Counter numGetAlls;
+  private Timer getCallbackNs;
+  private Counter numMissedLookups;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+    keys = Arrays.asList("k1", "k2", "k3");
+
+    values = new HashMap<>();
+    values.put("k1", "v1");
+    values.put("k2", "v2");
+    values.put("k3", null);
+
+    kvStore = mock(KeyValueStore.class);
+    when(kvStore.get("k1")).thenReturn("v1");
+    when(kvStore.get("k2")).thenReturn("v2");
+    when(kvStore.getAll(keys)).thenReturn(values);
+
+    getNs = new Timer("");
+    getAllNs = new Timer("");
+    numGets = new Counter("");
+    numGetAlls = new Counter("");
+    getCallbackNs = new Timer("");
+    numMissedLookups = new Counter("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalReadableTable.class.getSimpleName();
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    ReadableTable table = createTable(false);
+    Assert.assertEquals("v1", table.get("k1"));
+    Assert.assertEquals("v2", table.getAsync("k2").get());
+    Assert.assertNull(table.get("k3"));
+    verify(kvStore, times(3)).get(any());
+    Assert.assertEquals(3, numGets.getCount());
+    Assert.assertEquals(1, numMissedLookups.getCount());
+    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGetAlls.getCount());
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    ReadableTable table = createTable(false);
+    Assert.assertEquals(values, table.getAll(keys));
+    Assert.assertEquals(values, table.getAllAsync(keys).get());
+    verify(kvStore, times(2)).getAll(any());
+    Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(3, numGetAlls.getCount());
+    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGets.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadableTable table = createTable(true);
+    table.get("");
+    table.getAsync("").get();
+    table.getAll(Collections.emptyList());
+    table.getAllAsync(Collections.emptyList()).get();
+    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
+    Assert.assertEquals(2, numGets.getCount());
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(2, numGetAlls.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalReadableTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalReadableTable table =  new LocalReadableTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
new file mode 100644
index 0000000..5367931
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import junit.framework.Assert;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.context.TaskContext;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableProvider {
+
+  @Test
+  public void testInit() {
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig());
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+    TaskContext taskContext = mock(TaskContext.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
+    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+
+    TableProvider tableProvider = createTableProvider("t1");
+    tableProvider.init(context);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInitFail() {
+    TableProvider tableProvider = createTableProvider("t1");
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  private TableProvider createTableProvider(String tableId) {
+    return new LocalTableProvider(tableId) {
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
deleted file mode 100644
index 752b91e..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv.descriptors;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalTableProvider;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalTableProvider {
-
-  @Test
-  public void testInit() {
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
-    when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
-
-    TableProvider tableProvider = createTableProvider("t1");
-    tableProvider.init(context);
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testInitFail() {
-    TableProvider tableProvider = createTableProvider("t1");
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  private TableProvider createTableProvider(String tableId) {
-    return new LocalTableProvider(tableId) {
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 80cb789..89a32d8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,11 +21,9 @@ package org.apache.samza.sql.impl;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index fa279f2..e112804 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -20,11 +20,11 @@
 package org.apache.samza.test.table;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,12 +34,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -49,9 +44,6 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalReadWriteTable;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.table.ReadWriteTable;
@@ -65,6 +57,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -74,16 +67,9 @@ import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
 import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 
 /**
@@ -357,50 +343,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
   }
 
   @Test
-  public void testAsyncOperation() throws Exception {
-    KeyValueStore kvStore = mock(KeyValueStore.class);
-    LocalReadWriteTable<String, String> table = new LocalReadWriteTable<>("table1", kvStore);
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
-    doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
-    doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
-
-    table.init(context);
-
-    // GET
-    doReturn("bar").when(kvStore).get(anyString());
-    Assert.assertEquals("bar", table.getAsync("foo").get());
-
-    // GET-ALL
-    Map<String, String> recordMap = new HashMap<>();
-    recordMap.put("foo1", "bar1");
-    recordMap.put("foo2", "bar2");
-    doReturn(recordMap).when(kvStore).getAll(anyList());
-    Assert.assertEquals(recordMap, table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
-
-    // PUT
-    table.putAsync("foo1", "bar1").get();
-    verify(kvStore, times(1)).put(anyString(), anyString());
-
-    // PUT-ALL
-    List<Entry<String, String>> records = Arrays.asList(new Entry<>("foo1", "bar1"), new Entry<>("foo2", "bar2"));
-    table.putAllAsync(records).get();
-    verify(kvStore, times(1)).putAll(anyList());
-
-    // DELETE
-    table.deleteAsync("foo").get();
-    verify(kvStore, times(1)).delete(anyString());
-
-    // DELETE-ALL
-    table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
-    verify(kvStore, times(1)).deleteAll(anyList());
-  }
-
-  @Test
   public void testWithLowLevelApi() throws Exception {
 
     Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index 8218b8b..c9228af 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -20,6 +20,7 @@
 package org.apache.samza.test.table;
 
 import com.google.common.cache.CacheBuilder;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
@@ -33,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -60,6 +62,7 @@ import org.apache.samza.table.remote.TableWriteFunction;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -68,6 +71,7 @@ import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.generatePageViews;
 import static org.apache.samza.test.table.TestTableData.generateProfiles;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -248,7 +252,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
     doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
     Context context = new MockContext();
-    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
     return context;
   }
 


[2/2] samza git commit: SAMZA-2004: Add ability to disable table metrics

Posted by we...@apache.org.
SAMZA-2004: Add ability to disable table metrics

For jobs with very high throughput, it is desirable to disable metrics on tables. We would introduce the option to disable all metrics for a table on table descriptor.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #822 from weisong44/SAMZA-2004-2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5069f1dd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5069f1dd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5069f1dd

Branch: refs/heads/master
Commit: 5069f1ddb74587ad6fb11d515bece45bd2a8f3bc
Parents: e25e0da
Author: Wei Song <ws...@linkedin.com>
Authored: Tue Nov 27 10:40:34 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Tue Nov 27 10:40:34 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/api/table-api.md    |   1 +
 .../apache/samza/config/JavaTableConfig.java    |   2 +-
 .../table/descriptors/BaseTableDescriptor.java  |  13 +-
 .../table/descriptors/LocalTableDescriptor.java |   3 -
 .../table/descriptors/TableDescriptor.java      |   9 -
 .../samza/table/remote/TableRateLimiter.java    |   4 +-
 .../apache/samza/table/BaseReadableTable.java   |  66 +++++
 .../apache/samza/table/BaseTableProvider.java   |   4 +-
 .../samza/table/caching/CachingTable.java       |  48 ++--
 .../table/caching/guava/GuavaCacheTable.java    |   9 +-
 .../table/remote/RemoteReadWriteTable.java      |  82 +++++--
 .../samza/table/remote/RemoteReadableTable.java | 145 +++--------
 .../table/utils/DefaultTableReadMetrics.java    |  56 -----
 .../table/utils/DefaultTableWriteMetrics.java   |  64 -----
 .../samza/table/utils/TableMetricsUtil.java     |  14 +-
 .../samza/table/utils/TableReadMetrics.java     |  65 +++++
 .../samza/table/utils/TableWriteMetrics.java    |  77 ++++++
 .../samza/config/TestJavaTableConfig.java       |  23 +-
 .../samza/table/caching/TestCachingTable.java   |  90 ++++++-
 .../table/remote/TestRemoteReadWriteTable.java  |   2 +-
 .../descriptors/TestRemoteTableDescriptor.java  |  12 +-
 .../samza/storage/kv/LocalReadWriteTable.java   |  49 ++--
 .../samza/storage/kv/LocalReadableTable.java    |  46 ++--
 .../storage/kv/TestLocalReadWriteTable.java     | 244 +++++++++++++++++++
 .../storage/kv/TestLocalReadableTable.java      | 158 ++++++++++++
 .../storage/kv/TestLocalTableProvider.java      |  66 +++++
 .../kv/descriptors/TestLocalTableProvider.java  |  60 -----
 .../sql/impl/ConfigBasedIOResolverFactory.java  |   2 -
 .../apache/samza/test/table/TestLocalTable.java |  64 +----
 .../samza/test/table/TestRemoteTable.java       |   7 +-
 30 files changed, 995 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/docs/learn/documentation/versioned/api/table-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index 0a9c33c..f53d9f5 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -248,6 +248,7 @@ The table below summarizes table metrics:
 |`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` operations|
 |`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations
 |`num-getAlls`|`ReadableTable`|Count of `getAll/getAllAsync()` operations
+|`num-missed-lookups`|`ReadableTable`|Count of missed get/getAll() operations
 |`put-ns`|`ReadWriteTable`|Average latency of `put/putAsync()` operations
 |`putAll-ns`|`ReadWriteTable`|Average latency of `putAll/putAllAsync()` operations
 |`num-puts`|`ReadWriteTable`|Count of `put/putAsync()` operations

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
index c4381a1..ee0045a 100644
--- a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -81,7 +81,7 @@ public class JavaTableConfig extends MapConfig {
    * @param tableId Id of the table
    * @return serde retistry key
    */
-  public String getValueSerde(String tableId) {
+  public String getMsgSerde(String tableId) {
     return get(String.format(STORE_MSG_SERDE, tableId), null);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index d660276..26c2ae3 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -19,11 +19,11 @@
 
 package org.apache.samza.table.descriptors;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 
@@ -34,7 +34,6 @@ import org.apache.samza.config.JavaTableConfig;
  * @param <V> the type of the value in this table
  * @param <D> the type of the concrete table descriptor
  */
-@InterfaceStability.Unstable
 abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
     implements TableDescriptor<K, V, D> {
 
@@ -50,7 +49,13 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
     this.tableId = tableId;
   }
 
-  @Override
+  /**
+   * Add a configuration entry for the table
+   *
+   * @param key the key
+   * @param value the value
+   * @return this table descriptor instance
+   */
   public D withConfig(String key, String value) {
     config.put(key, value);
     return (D) this;
@@ -64,6 +69,8 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
   @Override
   public Map<String, String> toConfig(Config jobConfig) {
 
+    Preconditions.checkNotNull(jobConfig, "Job config is null");
+
     validate();
 
     Map<String, String> tableConfig = new HashMap<>(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index d194091..1ebb580 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -28,7 +28,6 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.storage.SideInputsProcessor;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -143,8 +142,6 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
 
     Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
 
-    JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig);
-
     if (sideInputs != null && !sideInputs.isEmpty()) {
       sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
           "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
index 806d158..81a98e6 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -67,15 +67,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
   String getTableId();
 
   /**
-   * Add a configuration entry for the table
-   *
-   * @param key the key
-   * @param value the value
-   * @return this table descriptor instance
-   */
-  D withConfig(String key, String value);
-
-  /**
    * Generate configuration for this table descriptor, the generated configuration
    * should be the complete configuration for this table that can be directly
    * included in the job configuration.

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
index c67a648..37d6385 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -122,7 +122,9 @@ public class TableRateLimiter<K, V> {
 
     long startNs = System.nanoTime();
     rateLimiter.acquire(Collections.singletonMap(tag, credits));
-    waitTimeMetric.update(System.nanoTime() - startNs);
+    if (waitTimeMetric != null) {
+      waitTimeMetric.update(System.nanoTime() - startNs);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
new file mode 100644
index 0000000..7eaaa83
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.utils.TableReadMetrics;
+import org.apache.samza.table.utils.TableWriteMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for all readable tables
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
+
+  protected final Logger logger;
+
+  protected final String tableId;
+
+  protected TableReadMetrics readMetrics;
+  protected TableWriteMetrics writeMetrics;
+
+  /**
+   * Construct an instance
+   * @param tableId Id of the table
+   */
+  public BaseReadableTable(String tableId) {
+    Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
+        String.format("Invalid table Id: %s", tableId));
+    this.tableId = tableId;
+    this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
+  }
+
+  @Override
+  public void init(Context context) {
+    readMetrics = new TableReadMetrics(context, this, tableId);
+    if (this instanceof ReadWriteTable) {
+      writeMetrics = new TableWriteMetrics(context, this, tableId);
+    }
+  }
+
+  public String getTableId() {
+    return tableId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
index 7ad423d..f2c0b08 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory;
  */
 abstract public class BaseTableProvider implements TableProvider {
 
-  final protected Logger logger = LoggerFactory.getLogger(getClass());
+  protected final Logger logger = LoggerFactory.getLogger(getClass());
 
-  final protected String tableId;
+  protected final String tableId;
 
   protected Context context;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index 703a6ff..ac6188b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -23,10 +23,9 @@ import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadableTable;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
 import java.util.ArrayList;
@@ -37,6 +36,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
 
 /**
  * A composite table incorporating a cache with a Samza table. The cache is
@@ -62,23 +64,20 @@ import java.util.stream.Collectors;
  * @param <K> type of the table key
  * @param <V> type of the table value
  */
-public class CachingTable<K, V> implements ReadWriteTable<K, V> {
-  private final String tableId;
+public class CachingTable<K, V> extends BaseReadableTable<K, V>
+    implements ReadWriteTable<K, V> {
+
   private final ReadableTable<K, V> rdTable;
   private final ReadWriteTable<K, V> rwTable;
   private final ReadWriteTable<K, V> cache;
   private final boolean isWriteAround;
 
-  // Metrics
-  private DefaultTableReadMetrics readMetrics;
-  private DefaultTableWriteMetrics writeMetrics;
-
   // Common caching stats
   private AtomicLong hitCount = new AtomicLong();
   private AtomicLong missCount = new AtomicLong();
 
   public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
-    this.tableId = tableId;
+    super(tableId);
     this.rdTable = table;
     this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
     this.cache = cache;
@@ -87,8 +86,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public void init(Context context) {
-    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
-    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+    super.init(context);
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
     tableMetricsUtil.newGauge("miss-rate", () -> missRate());
@@ -125,7 +123,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<V> getAsync(K key) {
-    readMetrics.numGets.inc();
+    incCounter(readMetrics.numGets);
     V value = cache.get(key);
     if (value != null) {
       hitCount.incrementAndGet();
@@ -142,7 +140,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
           if (result != null) {
             cache.put(key, result);
           }
-          readMetrics.getNs.update(System.nanoTime() - startNs);
+          updateTimer(readMetrics.getNs, System.nanoTime() - startNs);
           return result;
         }
       });
@@ -161,7 +159,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    readMetrics.numGetAlls.inc();
+    incCounter(readMetrics.numGetAlls);
     // Make a copy of entries which might be immutable
     Map<K, V> getAllResult = new HashMap<>();
     List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -181,7 +179,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          readMetrics.getAllNs.update(System.nanoTime() - startNs);
+          updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -200,7 +198,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
-    writeMetrics.numPuts.inc();
+    incCounter(writeMetrics.numPuts);
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
 
     long startNs = System.nanoTime();
@@ -214,7 +212,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
             cache.put(key, value);
           }
         }
-        writeMetrics.putNs.update(System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putNs, System.nanoTime() - startNs);
         return result;
       });
   }
@@ -232,7 +230,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    writeMetrics.numPutAlls.inc();
+    incCounter(writeMetrics.numPutAlls);
     long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
     return rwTable.putAllAsync(records).handle((result, e) -> {
@@ -242,7 +240,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
           cache.putAll(records);
         }
 
-        writeMetrics.putAllNs.update(System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs);
         return result;
       });
   }
@@ -260,7 +258,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
-    writeMetrics.numDeletes.inc();
+    incCounter(writeMetrics.numDeletes);
     long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     return rwTable.deleteAsync(key).handle((result, e) -> {
@@ -269,7 +267,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        writeMetrics.deleteNs.update(System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs);
         return result;
       });
   }
@@ -285,7 +283,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    writeMetrics.numDeleteAlls.inc();
+    incCounter(writeMetrics.numDeleteAlls);
     long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     return rwTable.deleteAllAsync(keys).handle((result, e) -> {
@@ -294,18 +292,18 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs);
         return result;
       });
   }
 
   @Override
   public synchronized void flush() {
-    writeMetrics.numFlushes.inc();
+    incCounter(writeMetrics.numFlushes);
     long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
     rwTable.flush();
-    writeMetrics.flushNs.update(System.nanoTime() - startNs);
+    updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index 391f068..b75a0bc 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -23,6 +23,7 @@ import com.google.common.cache.Cache;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadableTable;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
@@ -39,17 +40,19 @@ import java.util.concurrent.CompletableFuture;
  * @param <K> type of the key in the cache
  * @param <V> type of the value in the cache
  */
-public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
-  private final String tableId;
+public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+    implements ReadWriteTable<K, V> {
+
   private final Cache<K, V> cache;
 
   public GuavaCacheTable(String tableId, Cache<K, V> cache) {
-    this.tableId = tableId;
+    super(tableId);
     this.cache = cache;
   }
 
   @Override
   public void init(Context context) {
+    super.init(context);
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     // hit- and miss-rate are provided by CachingTable.
     tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount());

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 60ac4b7..b96087b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -21,11 +21,16 @@ package org.apache.samza.table.remote;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
 import java.util.List;
@@ -33,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
 
 /**
  * Remote store backed read writable table
@@ -40,9 +48,8 @@ import java.util.stream.Collectors;
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
-
-  private DefaultTableWriteMetrics writeMetrics;
+public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
+    implements ReadWriteTable<K, V> {
 
   protected final TableWriteFunction<K, V> writeFn;
   protected final TableRateLimiter writeRateLimiter;
@@ -59,9 +66,11 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void init(Context context) {
     super.init(context);
-    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-    writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+      writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+    }
   }
 
   @Override
@@ -80,8 +89,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
       return deleteAsync(key);
     }
 
-    writeMetrics.numPuts.inc();
-    return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.putNs)
+    return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs)
         .exceptionally(e -> {
             throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
           });
@@ -103,8 +111,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
       return CompletableFuture.completedFuture(null);
     }
 
-    writeMetrics.numPutAlls.inc();
-
     List<K> deleteKeys = records.stream()
         .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
 
@@ -117,7 +123,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     // Return the combined future
     return CompletableFuture.allOf(
         deleteFuture,
-        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.putAllNs))
+        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs))
         .exceptionally(e -> {
             String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
             throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
@@ -136,8 +142,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
     Preconditions.checkNotNull(key);
-    writeMetrics.numDeletes.inc();
-    return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.deleteNs)
+    return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs)
         .exceptionally(e -> {
             throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
           });
@@ -159,8 +164,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
       return CompletableFuture.completedFuture(null);
     }
 
-    writeMetrics.numDeleteAlls.inc();
-    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.deleteAllNs)
+    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
         .exceptionally(e -> {
             throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
           });
@@ -169,10 +173,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void flush() {
     try {
-      writeMetrics.numFlushes.inc();
+      incCounter(writeMetrics.numFlushes);
       long startNs = System.nanoTime();
       writeFn.flush();
-      writeMetrics.flushNs.update(System.nanoTime() - startNs);
+      updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = "Failed to flush remote store";
       logger.error(errMsg, e);
@@ -186,6 +190,48 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     super.close();
   }
 
+  /**
+   * Execute an async request given a table record (key+value)
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param value value of the table record
+   * @param method method to be executed
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = System.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+            .thenCompose((r) -> method.apply(key, value))
+        : method.apply(key, value);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table records
+   * @param rateLimiter helper for rate limiting
+   * @param records list of records
+   * @param method method to be executed
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
+      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
+      Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = System.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
+            .thenCompose((r) -> method.apply(records))
+        : method.apply(records);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
   @VisibleForTesting
   public TableWriteFunction<K, V> getWriteFn() {
     return writeFn;

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 1b6bfea..e02650e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -23,14 +23,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Objects;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.table.BaseReadableTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -38,9 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 /**
  * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
@@ -71,18 +70,13 @@ import java.util.function.Function;
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
-
-  protected final String tableId;
-  protected final Logger logger;
+public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
 
   protected final ExecutorService callbackExecutor;
   protected final ExecutorService tableExecutor;
   protected final TableReadFunction<K, V> readFn;
   protected final TableRateLimiter<K, V> readRateLimiter;
 
-  private DefaultTableReadMetrics readMetrics;
-
   /**
    * Construct a RemoteReadableTable instance
    * @param tableId table id
@@ -93,21 +87,22 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
    */
   public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
       TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) {
-    Preconditions.checkArgument(tableId != null && !tableId.isEmpty(), "invalid table id");
+    super(tableId);
     Preconditions.checkNotNull(readFn, "null read function");
-    this.tableId = tableId;
     this.readFn = readFn;
     this.readRateLimiter = rateLimiter;
     this.callbackExecutor = callbackExecutor;
     this.tableExecutor = tableExecutor;
-    this.logger = LoggerFactory.getLogger(getClass().getName() + "-" + tableId);
   }
 
   @Override
   public void init(Context context) {
-    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-    readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+    super.init(context);
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+      readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+    }
   }
 
   @Override
@@ -122,14 +117,13 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
   @Override
   public CompletableFuture<V> getAsync(K key) {
     Preconditions.checkNotNull(key);
-    readMetrics.numGets.inc();
-    return execute(readRateLimiter, key, readFn::getAsync, readMetrics.getNs)
+    return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + key, e);
             }
             if (result == null) {
-              readMetrics.numMissedLookups.inc();
+              incCounter(readMetrics.numMissedLookups);
             }
             return result;
           });
@@ -137,7 +131,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
-    readMetrics.numGetAlls.inc();
     try {
       return getAllAsync(keys).get();
     } catch (Exception e) {
@@ -151,13 +144,12 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
     if (keys.isEmpty()) {
       return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
     }
-    readMetrics.numGetAlls.inc();
-    return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.getAllNs)
+    return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + keys, e);
             }
-            result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc());
+            result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
             return result;
           });
   }
@@ -172,56 +164,15 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
    * @return CompletableFuture of the operation
    */
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, Function<K, CompletableFuture<T>> method, Timer timer) {
+      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
+    incCounter(counter);
     final long startNs = System.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ?
-        CompletableFuture
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
-            .thenCompose((r) -> method.apply(key)) :
-        method.apply(key);
-    if (callbackExecutor != null) {
-      ioFuture.thenApplyAsync(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        }, callbackExecutor);
-    } else {
-      ioFuture.thenApply(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        });
-    }
-    return ioFuture;
-  }
-
-  /**
-   * Execute an async request given a table record (key+value)
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param value value of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Timer timer) {
-    final long startNs = System.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() ?
-        CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
-            .thenCompose((r) -> method.apply(key, value)) :
-        method.apply(key, value);
-    if (callbackExecutor != null) {
-      ioFuture.thenApplyAsync(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        }, callbackExecutor);
-    } else {
-      ioFuture.thenApply(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        });
-    }
-    return ioFuture;
+            .thenCompose((r) -> method.apply(key))
+        : method.apply(key);
+    return completeExecution(ioFuture, startNs, timer);
   }
 
   /**
@@ -234,54 +185,34 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
    * @return CompletableFuture of the operation
    */
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
-      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Timer timer) {
+      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
+    incCounter(counter);
     final long startNs = System.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ?
-        CompletableFuture
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
-            .thenCompose((r) -> method.apply(keys)) :
-        method.apply(keys);
-    if (callbackExecutor != null) {
-      ioFuture.thenApplyAsync(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        }, callbackExecutor);
-    } else {
-      ioFuture.thenApply(r -> {
-          timer.update(System.nanoTime() - startNs);
-          return r;
-        });
-    }
-    return ioFuture;
+            .thenCompose((r) -> method.apply(keys))
+        : method.apply(keys);
+    return completeExecution(ioFuture, startNs, timer);
   }
 
   /**
-   * Execute an async request given a collection of table records
-   * @param rateLimiter helper for rate limiting
-   * @param records list of records
-   * @param method method to be executed
+   * Complete the pending execution and update timer
+   * @param ioFuture the future to be executed
+   * @param startNs start time in nanosecond
    * @param timer latency metric to be updated
+   * @param <T> return type
    * @return CompletableFuture of the operation
    */
-  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
-      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, Timer timer) {
-    final long startNs = System.nanoTime();
-    CompletableFuture<Void> ioFuture;
-    if (rateLimiter.isRateLimited()) {
-      ioFuture = CompletableFuture
-          .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
-          .thenCompose((r) -> method.apply(records));
-    } else {
-      ioFuture = method.apply(records);
-    }
+  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
     if (callbackExecutor != null) {
       ioFuture.thenApplyAsync(r -> {
-          timer.update(System.nanoTime() - startNs);
+          updateTimer(timer, System.nanoTime() - startNs);
           return r;
         }, callbackExecutor);
     } else {
       ioFuture.thenApply(r -> {
-          timer.update(System.nanoTime() - startNs);
+          updateTimer(timer, System.nanoTime() - startNs);
           return r;
         });
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
deleted file mode 100644
index 525a0cb..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class DefaultTableReadMetrics {
-
-  public final Timer getNs;
-  public final Timer getAllNs;
-  public final Counter numGets;
-  public final Counter numGetAlls;
-  public final Timer getCallbackNs;
-  public final Counter numMissedLookups;
-
-  /**
-   * Constructor based on container and task container context
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public DefaultTableReadMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    getNs = tableMetricsUtil.newTimer("get-ns");
-    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-    numGets = tableMetricsUtil.newCounter("num-gets");
-    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
-    getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
-    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
deleted file mode 100644
index 69d4ef2..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class DefaultTableWriteMetrics {
-
-  public final Timer putNs;
-  public final Timer putAllNs;
-  public final Timer deleteNs;
-  public final Timer deleteAllNs;
-  public final Timer flushNs;
-  public final Counter numPuts;
-  public final Counter numPutAlls;
-  public final Counter numDeletes;
-  public final Counter numDeleteAlls;
-  public final Counter numFlushes;
-  public final Timer putCallbackNs;
-  public final Timer deleteCallbackNs;
-
-  /**
-   * Utility class that contains the default set of write metrics.
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public DefaultTableWriteMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    putNs = tableMetricsUtil.newTimer("put-ns");
-    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
-    deleteNs = tableMetricsUtil.newTimer("delete-ns");
-    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
-    flushNs = tableMetricsUtil.newTimer("flush-ns");
-    numPuts = tableMetricsUtil.newCounter("num-puts");
-    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
-    numDeletes = tableMetricsUtil.newCounter("num-deletes");
-    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
-    numFlushes = tableMetricsUtil.newCounter("num-flushes");
-    putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
-    deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 1b19272..90d6fe8 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -53,7 +53,7 @@ public class TableMetricsUtil {
     Preconditions.checkNotNull(table);
     Preconditions.checkNotNull(tableId);
 
-    this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry();
+    this.metricsRegistry = context.getContainerContext().getContainerMetricsRegistry();
     this.groupName = table.getClass().getSimpleName();
     this.tableId = tableId;
   }
@@ -87,6 +87,18 @@ public class TableMetricsUtil {
     return metricsRegistry.newGauge(groupName, new SupplierGauge(getMetricFullName(name), supplier));
   }
 
+  public static void incCounter(Counter counter) {
+    if (counter != null) {
+      counter.inc();
+    }
+  }
+
+  public static void updateTimer(Timer timer, long duration) {
+    if (timer != null) {
+      timer.update(duration);
+    }
+  }
+
   private String getMetricFullName(String name) {
     return String.format("%s-%s", tableId, name);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
new file mode 100644
index 0000000..0775844
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableReadMetrics {
+
+  public final Timer getNs;
+  public final Timer getAllNs;
+  public final Counter numGets;
+  public final Counter numGetAlls;
+  public final Timer getCallbackNs;
+  public final Counter numMissedLookups;
+
+  /**
+   * Constructor based on container and task container context
+   *
+   * @param context {@link Context} for this task
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public TableReadMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+    numGets = tableMetricsUtil.newCounter("num-gets");
+    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      getNs = tableMetricsUtil.newTimer("get-ns");
+      getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+      getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
+    } else {
+      getNs = null;
+      getAllNs = null;
+      getCallbackNs = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
new file mode 100644
index 0000000..02af35f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+public class TableWriteMetrics {
+
+  public final Timer putNs;
+  public final Timer putAllNs;
+  public final Timer deleteNs;
+  public final Timer deleteAllNs;
+  public final Timer flushNs;
+  public final Counter numPuts;
+  public final Counter numPutAlls;
+  public final Counter numDeletes;
+  public final Counter numDeleteAlls;
+  public final Counter numFlushes;
+  public final Timer putCallbackNs;
+  public final Timer deleteCallbackNs;
+
+  /**
+   * Utility class that contains the default set of write metrics.
+   *
+   * @param context {@link Context} for this task
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public TableWriteMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+    numPuts = tableMetricsUtil.newCounter("num-puts");
+    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    numFlushes = tableMetricsUtil.newCounter("num-flushes");
+
+    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      putNs = tableMetricsUtil.newTimer("put-ns");
+      putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+      deleteNs = tableMetricsUtil.newTimer("delete-ns");
+      deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+      flushNs = tableMetricsUtil.newTimer("flush-ns");
+      putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
+      deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
+    } else {
+      putNs = null;
+      putAllNs = null;
+      deleteNs = null;
+      deleteAllNs = null;
+      flushNs = null;
+      putCallbackNs = null;
+      deleteCallbackNs = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
index 2775ca7..bd3ef18 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
@@ -49,10 +50,30 @@ public class TestJavaTableConfig {
   @Test
   public void testGetTableProperties() {
     Map<String, String> map = new HashMap<>();
-    map.put("tables.t1.spec", "t1-spec");
+    map.put("stores.t1.key.serde", "key-serde");
+    map.put("stores.t1.msg.serde", "msg-serde");
     map.put("tables.t1.provider.factory", "t1-provider-factory");
     JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
     assertEquals("t1-provider-factory", tableConfig.getTableProviderFactory("t1"));
+    assertEquals("key-serde", tableConfig.getKeySerde("t1"));
+    assertEquals("msg-serde", tableConfig.getMsgSerde("t1"));
   }
 
+  @Test
+  public void testBuildKey() {
+    String key = JavaTableConfig.buildKey("t1", "abc");
+    Assert.assertEquals("tables.t1.abc", key);
+  }
+
+  @Test
+  public void testGetForTable() {
+    Map<String, String> map = new HashMap<>();
+    map.put(JavaTableConfig.buildKey("t1", "abc"), "xyz");
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+    Assert.assertEquals("xyz", tableConfig.getForTable("t1", "abc"));
+    Assert.assertNull(tableConfig.getForTable("t1", "aaa"));
+    Assert.assertEquals("xyz", tableConfig.getForTable("t1", "aaa", "xyz"));
+    Assert.assertNull(tableConfig.getForTable("tt", "abc"));
+    Assert.assertEquals("xyz", tableConfig.getForTable("tt", "abc", "xyz"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index bfb329a..daaba46 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -24,6 +24,7 @@ import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
 import org.apache.samza.metrics.Counter;
@@ -59,14 +60,13 @@ import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+
 
 public class TestCachingTable {
+
+  private MetricsRegistry metricsRegistry;
+
   @Test
   public void testSerializeSimple() {
     doTestSerialize(null);
@@ -139,15 +139,23 @@ public class TestCachingTable {
   }
 
   private void initTables(ReadableTable ... tables) {
+    initTables(false, tables);
+  }
+
+  private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerMetricsDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
     Context context = new MockContext();
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doReturn(new MapConfig(config)).when(context.getJobContext()).getConfig();
+    metricsRegistry = mock(MetricsRegistry.class);
     doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
     doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
     doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
-    when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry);
-    for (ReadableTable table : tables) {
-      table.init(context);
-    }
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+
+    Arrays.asList(tables).forEach(t -> t.init(context));
   }
 
   private void doTestCacheOps(boolean isWriteAround) {
@@ -183,7 +191,7 @@ public class TestCachingTable {
         return null;
       }).when(context.getTaskContext()).getTable(anyString());
 
-    when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+    when(context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
 
     Map<String, String> tableConfig = desc.toConfig(new MapConfig());
     when(context.getJobContext().getConfig()).thenReturn(new MapConfig(tableConfig));
@@ -284,6 +292,20 @@ public class TestCachingTable {
 
     initTables(cachingTable, guavaTable, remoteTable);
 
+    // 3 per readable table (9)
+    // 5 per read/write table (15)
+    verify(metricsRegistry, times(24)).newCounter(any(), anyString());
+
+    // 3 per readable table (9)
+    // 7 per read/write table (21)
+    // 1 per remote readable table (1)
+    // 1 per remote read/write table (1)
+    verify(metricsRegistry, times(32)).newTimer(any(), anyString());
+
+    // 1 per guava table (1)
+    // 3 per caching table (2)
+    verify(metricsRegistry, times(4)).newGauge(anyString(), any());
+
     // GET
     doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
     Assert.assertEquals(cachingTable.getAsync("foo").get(), "bar");
@@ -364,6 +386,50 @@ public class TestCachingTable {
     Assert.assertNull(guavaCache.getIfPresent("foo3"));
   }
 
+  @Test
+  public void testTimerDisabled() throws Exception {
+    String tableId = "testTimerDisabled";
+
+    Cache<String, String> guavaCache = CacheBuilder.newBuilder().initialCapacity(100).build();
+    final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId, guavaCache);
+
+    TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
+
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(CompletableFuture.completedFuture("")).when(readFn).getAsync(any());
+
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
+
+    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+        tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
+        Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
+
+    final CachingTable<String, String> cachingTable = new CachingTable<>(
+        tableId, remoteTable, guavaTable, false);
+
+    initTables(true, cachingTable, guavaTable, remoteTable);
+
+    cachingTable.get("");
+    cachingTable.getAsync("").get();
+    cachingTable.getAll(Collections.emptyList());
+    cachingTable.getAllAsync(Collections.emptyList());
+    cachingTable.flush();
+    cachingTable.put("", "");
+    cachingTable.putAsync("", "");
+    cachingTable.putAll(Collections.emptyList());
+    cachingTable.putAllAsync(Collections.emptyList());
+    cachingTable.delete("");
+    cachingTable.deleteAsync("");
+    cachingTable.deleteAll(Collections.emptyList());
+    cachingTable.deleteAllAsync(Collections.emptyList());
+
+    verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString());
+    verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any());
+    verify(metricsRegistry, times(0)).newTimer(any(), anyString());
+  }
+
   private TableDescriptor createDummyTableDescriptor(String tableId) {
     BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class);
     when(tableDescriptor.getTableId()).thenReturn(tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
index d1369d0..d7733a8 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
@@ -62,7 +62,7 @@ public class TestRemoteReadWriteTable {
     doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
     doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
     doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 69e95d4..7a75c90 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -132,13 +132,16 @@ public class TestRemoteTableDescriptor {
   private Context createMockContext(TableDescriptor tableDescriptor) {
     Context context = mock(Context.class);
 
-    TaskContextImpl taskContext = mock(TaskContextImpl.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
 
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     when(metricsRegistry.newTimer(anyString(), anyString())).thenReturn(mock(Timer.class));
     when(metricsRegistry.newCounter(anyString(), anyString())).thenReturn(mock(Counter.class));
-    when(taskContext.getTaskMetricsRegistry()).thenReturn(metricsRegistry);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    TaskContextImpl taskContext = mock(TaskContextImpl.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
 
     TaskName taskName = new TaskName("MyTask");
     TaskModel taskModel = mock(TaskModel.class);
@@ -147,10 +150,7 @@ public class TestRemoteTableDescriptor {
 
     ContainerModel containerModel = mock(ContainerModel.class);
     when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
-
-    ContainerContext containerContext = mock(ContainerContext.class);
     when(containerContext.getContainerModel()).thenReturn(containerModel);
-    when(context.getContainerContext()).thenReturn(containerContext);
 
     String containerId = "container-1";
     JobModel jobModel = mock(JobModel.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
index e0107f9..98d3768 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
@@ -20,9 +20,12 @@ package org.apache.samza.storage.kv;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 
 /**
@@ -34,8 +37,6 @@ import org.apache.samza.table.utils.DefaultTableWriteMetrics;
 public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
     implements ReadWriteTable<K, V> {
 
-  protected DefaultTableWriteMetrics writeMetrics;
-
   /**
    * Constructs an instance of {@link LocalReadWriteTable}
    * @param tableId the table Id
@@ -46,18 +47,9 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
   }
 
   @Override
-  public void init(Context context) {
-    super.init(context);
-    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
-  }
-
-  @Override
   public void put(K key, V value) {
     if (value != null) {
-      writeMetrics.numPuts.inc();
-      long startNs = System.nanoTime();
-      kvStore.put(key, value);
-      writeMetrics.putNs.update(System.nanoTime() - startNs);
+      instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
     } else {
       delete(key);
     }
@@ -77,10 +69,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
-    writeMetrics.numPutAlls.inc();
-    long startNs = System.nanoTime();
-    kvStore.putAll(entries);
-    writeMetrics.putAllNs.update(System.nanoTime() - startNs);
+    instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(entries));
   }
 
   @Override
@@ -97,10 +86,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void delete(K key) {
-    writeMetrics.numDeletes.inc();
-    long startNs = System.nanoTime();
-    kvStore.delete(key);
-    writeMetrics.deleteNs.update(System.nanoTime() - startNs);
+    instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
   }
 
   @Override
@@ -117,10 +103,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void deleteAll(List<K> keys) {
-    writeMetrics.numDeleteAlls.inc();
-    long startNs = System.nanoTime();
-    kvStore.deleteAll(keys);
-    writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
+    instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
   }
 
   @Override
@@ -137,10 +120,18 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void flush() {
-    writeMetrics.numFlushes.inc();
+    instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
+  }
+
+  private interface Func0 {
+    void apply();
+  }
+
+  private void instrument(Counter counter, Timer timer, Func0 func) {
+    incCounter(counter);
     long startNs = System.nanoTime();
-    kvStore.flush();
-    writeMetrics.flushNs.update(System.nanoTime() - startNs);
+    func.apply();
+    updateTimer(timer, System.nanoTime() - startNs);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
index f314918..ba0d3cf 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
@@ -19,14 +19,19 @@
 package org.apache.samza.storage.kv;
 
 import com.google.common.base.Preconditions;
+
+import com.google.common.base.Supplier;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
 
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadableTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 /**
  * A store backed readable table
@@ -34,12 +39,9 @@ import org.apache.samza.table.utils.DefaultTableReadMetrics;
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
+public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
 
   protected final KeyValueStore<K, V> kvStore;
-  protected final String tableId;
-
-  protected DefaultTableReadMetrics readMetrics;
 
   /**
    * Constructs an instance of {@link LocalReadableTable}
@@ -47,25 +49,16 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
    * @param kvStore the backing store
    */
   public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
-    Preconditions.checkArgument(tableId != null & !tableId.isEmpty() , "invalid tableId");
+    super(tableId);
     Preconditions.checkNotNull(kvStore, "null KeyValueStore");
-    this.tableId = tableId;
     this.kvStore = kvStore;
   }
 
   @Override
-  public void init(Context context) {
-    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
-  }
-
-  @Override
   public V get(K key) {
-    readMetrics.numGets.inc();
-    long startNs = System.nanoTime();
-    V result = kvStore.get(key);
-    readMetrics.getNs.update(System.nanoTime() - startNs);
+    V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
     if (result == null) {
-      readMetrics.numMissedLookups.inc();
+      incCounter(readMetrics.numMissedLookups);
     }
     return result;
   }
@@ -83,11 +76,8 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
-    readMetrics.numGetAlls.inc();
-    long startNs = System.nanoTime();
-    Map<K, V> result = kvStore.getAll(keys);
-    readMetrics.getAllNs.update(System.nanoTime() - startNs);
-    result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc());
+    Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
+    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
     return result;
   }
 
@@ -107,4 +97,12 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
     // The KV store is not closed here as it may still be needed by downstream operators,
     // it will be closed by the SamzaContainer
   }
+
+  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+    incCounter(counter);
+    long startNs = System.nanoTime();
+    T result = func.get();
+    updateTimer(timer, System.nanoTime() - startNs);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
new file mode 100644
index 0000000..5531951
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Collections;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalReadWriteTable {
+
+  public static final String TABLE_ID = "t1";
+
+  private Timer putNs;
+  private Timer putAllNs;
+  private Timer deleteNs;
+  private Timer deleteAllNs;
+  private Timer flushNs;
+  private Counter numPuts;
+  private Counter numPutAlls;
+  private Counter numDeletes;
+  private Counter numDeleteAlls;
+  private Counter numFlushes;
+  private Timer putCallbackNs;
+  private Timer deleteCallbackNs;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+
+    putNs = new Timer("");
+    putAllNs = new Timer("");
+    deleteNs = new Timer("");
+    deleteAllNs = new Timer("");
+    flushNs = new Timer("");
+    numPuts = new Counter("");
+    numPutAlls = new Counter("");
+    numDeletes = new Counter("");
+    numDeleteAlls = new Counter("");
+    numFlushes = new Counter("");
+    putCallbackNs = new Timer("");
+    deleteCallbackNs = new Timer("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalReadWriteTable.class.getSimpleName();
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+    kvStore = mock(KeyValueStore.class);
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.put("k1", "v1");
+    table.putAsync("k2", "v2").get();
+    verify(kvStore, times(2)).put(any(), any());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.putAll(Collections.emptyList());
+    table.putAllAsync(Collections.emptyList()).get();
+    verify(kvStore, times(2)).putAll(any());
+    Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.delete("");
+    table.deleteAsync("").get();
+    verify(kvStore, times(2)).delete(any());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    verify(kvStore, times(2)).deleteAll(any());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testFlush() {
+    ReadWriteTable table = createTable(false);
+    table.flush();
+    table.flush();
+    verify(kvStore, times(2)).flush();
+    Assert.assertEquals(2, numFlushes.getCount());
+    Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadWriteTable table = createTable(true);
+    table.put("", "");
+    table.putAsync("", "").get();
+    table.putAll(Collections.emptyList());
+    table.putAllAsync(Collections.emptyList()).get();
+    table.delete("");
+    table.deleteAsync("").get();
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    table.flush();
+    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
+    Assert.assertEquals(1, numFlushes.getCount());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalReadWriteTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalReadWriteTable table =  new LocalReadWriteTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}