You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:35 UTC

[2/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
deleted file mode 100644
index 044fab4..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalReadWriteTable {
-
-  public static final String TABLE_ID = "t1";
-
-  private Timer putNs;
-  private Timer putAllNs;
-  private Timer deleteNs;
-  private Timer deleteAllNs;
-  private Timer flushNs;
-  private Counter numPuts;
-  private Counter numPutAlls;
-  private Counter numDeletes;
-  private Counter numDeleteAlls;
-  private Counter numFlushes;
-  private Timer putCallbackNs;
-  private Timer deleteCallbackNs;
-
-  private MetricsRegistry metricsRegistry;
-
-  private KeyValueStore kvStore;
-
-  @Before
-  public void setUp() {
-
-    putNs = new Timer("");
-    putAllNs = new Timer("");
-    deleteNs = new Timer("");
-    deleteAllNs = new Timer("");
-    flushNs = new Timer("");
-    numPuts = new Counter("");
-    numPutAlls = new Counter("");
-    numDeletes = new Counter("");
-    numDeleteAlls = new Counter("");
-    numFlushes = new Counter("");
-    putCallbackNs = new Timer("");
-    deleteCallbackNs = new Timer("");
-
-    metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadWriteTable.class.getSimpleName();
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
-
-    kvStore = mock(KeyValueStore.class);
-  }
-
-  @Test
-  public void testPut() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.put("k1", "v1");
-    table.putAsync("k2", "v2").get();
-    table.putAsync("k3", null).get();
-    verify(kvStore, times(2)).put(any(), any());
-    verify(kvStore, times(1)).delete(any());
-    Assert.assertEquals(2, numPuts.getCount());
-    Assert.assertEquals(1, numDeletes.getCount());
-    Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
-    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testPutAll() throws Exception {
-    ReadWriteTable table = createTable(false);
-    List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
-    table.putAll(entries);
-    table.putAllAsync(entries).get();
-    verify(kvStore, times(2)).putAll(any());
-    verify(kvStore, times(2)).deleteAll(any());
-    Assert.assertEquals(2, numPutAlls.getCount());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.delete("");
-    table.deleteAsync("").get();
-    verify(kvStore, times(2)).delete(any());
-    Assert.assertEquals(2, numDeletes.getCount());
-    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testDeleteAll() throws Exception {
-    ReadWriteTable table = createTable(false);
-    table.deleteAll(Collections.emptyList());
-    table.deleteAllAsync(Collections.emptyList()).get();
-    verify(kvStore, times(2)).deleteAll(any());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numFlushes.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testFlush() {
-    ReadWriteTable table = createTable(false);
-    table.flush();
-    table.flush();
-    verify(kvStore, times(2)).flush();
-    Assert.assertEquals(2, numFlushes.getCount());
-    Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testTimerDisabled() throws Exception {
-    ReadWriteTable table = createTable(true);
-    table.put("", "");
-    table.putAsync("", "").get();
-    table.putAll(Collections.emptyList());
-    table.putAllAsync(Collections.emptyList()).get();
-    table.delete("");
-    table.deleteAsync("").get();
-    table.deleteAll(Collections.emptyList());
-    table.deleteAllAsync(Collections.emptyList()).get();
-    table.flush();
-    Assert.assertEquals(1, numFlushes.getCount());
-    Assert.assertEquals(2, numPuts.getCount());
-    Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(2, numDeletes.getCount());
-    Assert.assertEquals(2, numDeleteAlls.getCount());
-    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  private LocalReadWriteTable createTable(boolean isTimerDisabled) {
-    Map<String, String> config = new HashMap<>();
-    if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
-    }
-    Context context = mock(Context.class);
-    JobContext jobContext = mock(JobContext.class);
-    when(context.getJobContext()).thenReturn(jobContext);
-    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
-    ContainerContext containerContext = mock(ContainerContext.class);
-    when(context.getContainerContext()).thenReturn(containerContext);
-    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
-    LocalReadWriteTable table =  new LocalReadWriteTable("t1", kvStore);
-    table.init(context);
-
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
deleted file mode 100644
index e1c82d9..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadableTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-public class TestLocalReadableTable {
-
-  public static final String TABLE_ID = "t1";
-
-  private List<String> keys;
-  private Map<String, String> values;
-
-  private Timer getNs;
-  private Timer getAllNs;
-  private Counter numGets;
-  private Counter numGetAlls;
-  private Timer getCallbackNs;
-  private Counter numMissedLookups;
-
-  private MetricsRegistry metricsRegistry;
-
-  private KeyValueStore kvStore;
-
-  @Before
-  public void setUp() {
-    keys = Arrays.asList("k1", "k2", "k3");
-
-    values = new HashMap<>();
-    values.put("k1", "v1");
-    values.put("k2", "v2");
-    values.put("k3", null);
-
-    kvStore = mock(KeyValueStore.class);
-    when(kvStore.get("k1")).thenReturn("v1");
-    when(kvStore.get("k2")).thenReturn("v2");
-    when(kvStore.getAll(keys)).thenReturn(values);
-
-    getNs = new Timer("");
-    getAllNs = new Timer("");
-    numGets = new Counter("");
-    numGetAlls = new Counter("");
-    getCallbackNs = new Timer("");
-    numMissedLookups = new Counter("");
-
-    metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadableTable.class.getSimpleName();
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
-    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
-    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    ReadableTable table = createTable(false);
-    Assert.assertEquals("v1", table.get("k1"));
-    Assert.assertEquals("v2", table.getAsync("k2").get());
-    Assert.assertNull(table.get("k3"));
-    verify(kvStore, times(3)).get(any());
-    Assert.assertEquals(3, numGets.getCount());
-    Assert.assertEquals(1, numMissedLookups.getCount());
-    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, numGetAlls.getCount());
-    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testGetAll() throws Exception {
-    ReadableTable table = createTable(false);
-    Assert.assertEquals(values, table.getAll(keys));
-    Assert.assertEquals(values, table.getAllAsync(keys).get());
-    verify(kvStore, times(2)).getAll(any());
-    Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
-    Assert.assertEquals(2, numMissedLookups.getCount());
-    Assert.assertEquals(3, numGetAlls.getCount());
-    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
-    Assert.assertEquals(0, numGets.getCount());
-    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  @Test
-  public void testTimerDisabled() throws Exception {
-    ReadableTable table = createTable(true);
-    table.get("");
-    table.getAsync("").get();
-    table.getAll(keys);
-    table.getAllAsync(keys).get();
-    Assert.assertEquals(2, numGets.getCount());
-    Assert.assertEquals(4, numMissedLookups.getCount());
-    Assert.assertEquals(2, numGetAlls.getCount());
-    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
-  }
-
-  private LocalReadableTable createTable(boolean isTimerDisabled) {
-    Map<String, String> config = new HashMap<>();
-    if (isTimerDisabled) {
-      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
-    }
-    Context context = mock(Context.class);
-    JobContext jobContext = mock(JobContext.class);
-    when(context.getJobContext()).thenReturn(jobContext);
-    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
-    ContainerContext containerContext = mock(ContainerContext.class);
-    when(context.getContainerContext()).thenReturn(containerContext);
-    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
-    LocalReadableTable table =  new LocalReadableTable("t1", kvStore);
-    table.init(context);
-
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
index 5367931..263ab56 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.storage.kv;
 
-import junit.framework.Assert;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
@@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
+import org.junit.Assert;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
new file mode 100644
index 0000000..0fd4539
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalTableRead {
+
+  public static final String TABLE_ID = "t1";
+
+  private List<String> keys;
+  private Map<String, String> values;
+
+  private Timer getNs;
+  private Timer getAllNs;
+  private Counter numGets;
+  private Counter numGetAlls;
+  private Timer getCallbackNs;
+  private Counter numMissedLookups;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+    keys = Arrays.asList("k1", "k2", "k3");
+
+    values = new HashMap<>();
+    values.put("k1", "v1");
+    values.put("k2", "v2");
+    values.put("k3", null);
+
+    kvStore = mock(KeyValueStore.class);
+    when(kvStore.get("k1")).thenReturn("v1");
+    when(kvStore.get("k2")).thenReturn("v2");
+    when(kvStore.getAll(keys)).thenReturn(values);
+
+    getNs = new Timer("");
+    getAllNs = new Timer("");
+    numGets = new Counter("");
+    numGetAlls = new Counter("");
+    getCallbackNs = new Timer("");
+    numMissedLookups = new Counter("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalTable.class.getSimpleName();
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    ReadWriteTable table = createTable(false);
+    Assert.assertEquals("v1", table.get("k1"));
+    Assert.assertEquals("v2", table.getAsync("k2").get());
+    Assert.assertNull(table.get("k3"));
+    verify(kvStore, times(3)).get(any());
+    Assert.assertEquals(3, numGets.getCount());
+    Assert.assertEquals(1, numMissedLookups.getCount());
+    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGetAlls.getCount());
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    Assert.assertEquals(values, table.getAll(keys));
+    Assert.assertEquals(values, table.getAllAsync(keys).get());
+    verify(kvStore, times(2)).getAll(any());
+    Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(3, numGetAlls.getCount());
+    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGets.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadWriteTable table = createTable(true);
+    table.get("");
+    table.getAsync("").get();
+    table.getAll(keys);
+    table.getAllAsync(keys).get();
+    Assert.assertEquals(2, numGets.getCount());
+    Assert.assertEquals(4, numMissedLookups.getCount());
+    Assert.assertEquals(2, numGetAlls.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalTable table =  new LocalTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
new file mode 100644
index 0000000..80eb99f
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableWrite {
+
+  public static final String TABLE_ID = "t1";
+
+  private Timer putNs;
+  private Timer putAllNs;
+  private Timer deleteNs;
+  private Timer deleteAllNs;
+  private Timer flushNs;
+  private Counter numPuts;
+  private Counter numPutAlls;
+  private Counter numDeletes;
+  private Counter numDeleteAlls;
+  private Counter numFlushes;
+  private Timer putCallbackNs;
+  private Timer deleteCallbackNs;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+
+    putNs = new Timer("");
+    putAllNs = new Timer("");
+    deleteNs = new Timer("");
+    deleteAllNs = new Timer("");
+    flushNs = new Timer("");
+    numPuts = new Counter("");
+    numPutAlls = new Counter("");
+    numDeletes = new Counter("");
+    numDeleteAlls = new Counter("");
+    numFlushes = new Counter("");
+    putCallbackNs = new Timer("");
+    deleteCallbackNs = new Timer("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalTable.class.getSimpleName();
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+    kvStore = mock(KeyValueStore.class);
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.put("k1", "v1");
+    table.putAsync("k2", "v2").get();
+    table.putAsync("k3", null).get();
+    verify(kvStore, times(2)).put(any(), any());
+    verify(kvStore, times(1)).delete(any());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(1, numDeletes.getCount());
+    Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testPutAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
+    table.putAll(entries);
+    table.putAllAsync(entries).get();
+    verify(kvStore, times(2)).putAll(any());
+    verify(kvStore, times(2)).deleteAll(any());
+    Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.delete("");
+    table.deleteAsync("").get();
+    verify(kvStore, times(2)).delete(any());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testDeleteAll() throws Exception {
+    ReadWriteTable table = createTable(false);
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    verify(kvStore, times(2)).deleteAll(any());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numFlushes.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testFlush() {
+    ReadWriteTable table = createTable(false);
+    table.flush();
+    table.flush();
+    verify(kvStore, times(2)).flush();
+    Assert.assertEquals(2, numFlushes.getCount());
+    Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(0, numDeletes.getCount());
+    Assert.assertEquals(0, numDeleteAlls.getCount());
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadWriteTable table = createTable(true);
+    table.put("", "");
+    table.putAsync("", "").get();
+    table.putAll(Collections.emptyList());
+    table.putAllAsync(Collections.emptyList()).get();
+    table.delete("");
+    table.deleteAsync("").get();
+    table.deleteAll(Collections.emptyList());
+    table.deleteAllAsync(Collections.emptyList()).get();
+    table.flush();
+    Assert.assertEquals(1, numFlushes.getCount());
+    Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeletes.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
+    Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalTable table =  new LocalTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2137a46..ab650f2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest {
 
     @Override
     public void init(Context context) throws Exception {
-      profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+      profileViewTable = context.getTaskContext().getTable("profile-view-store");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
deleted file mode 100644
index b447493..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.context.Context;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * This test class tests sendTo() and join() for local tables
- */
-public class TestLocalTable extends AbstractIntegrationTestHarness {
-
-  @Test
-  public void testSendTo() throws Exception {
-
-    int count = 10;
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    MyMapFunction mapFn = new MyMapFunction();
-
-    final StreamApplication app = appDesc -> {
-
-      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
-          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
-      appDesc.getInputStream(isd)
-          .map(mapFn)
-          .sendTo(table);
-    };
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    for (int i = 0; i < partitionCount; i++) {
-      MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
-      assertEquals(count, mapFnCopy.received.size());
-      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
-    }
-  }
-
-  static class StreamTableJoinApp implements StreamApplication {
-    static List<PageView> received = new LinkedList<>();
-    static List<EnrichedPageView> joined = new LinkedList<>();
-
-    @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      Table<KV<Integer, Profile>> table = appDesc.getTable(
-          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      appDesc.getInputStream(profileISD)
-          .map(m -> new KV(m.getMemberId(), m))
-          .sendTo(table);
-
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.getInputStream(pageViewISD)
-          .map(pv -> {
-              received.add(pv);
-              return pv;
-            })
-          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
-          .join(table, new PageViewToProfileJoinFunction())
-          .sink((m, collector, coordinator) -> joined.add(m));
-    }
-  }
-
-  @Test
-  public void testStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.samza.bootstrap", "true");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
-    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
-    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
-  }
-
-  static class DualStreamTableJoinApp implements StreamApplication {
-    static List<Profile> sentToProfileTable1 = new LinkedList<>();
-    static List<Profile> sentToProfileTable2 = new LinkedList<>();
-    static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
-    static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-
-    @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
-      KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
-
-      PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
-      PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
-
-      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
-
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
-      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
-      MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
-      MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
-
-      profileStream1
-          .map(m -> {
-              sentToProfileTable1.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-      profileStream2
-          .map(m -> {
-              sentToProfileTable2.add(m);
-              return new KV(m.getMemberId(), m);
-            })
-          .sendTo(profileTable);
-
-      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
-      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
-      MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
-      MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
-
-      pageViewStream1
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
-          .join(profileTable, joinFn1)
-          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
-      pageViewStream2
-          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
-          .join(profileTable, joinFn2)
-          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
-    }
-  }
-
-  @Test
-  public void testDualStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile1.samza.system", "test");
-    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile1.samza.bootstrap", "true");
-    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile2.samza.system", "test");
-    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile2.samza.bootstrap", "true");
-    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView1.samza.system", "test");
-    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView2.samza.system", "test");
-    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
-    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
-    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
-  }
-
-  static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
-
-    configs.put(JobConfig.JOB_NAME(), "test-table-job");
-    configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
-    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
-    // For intermediate streams
-    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
-    configs.put("systems.kafka.samza.key.serde", "int");
-    configs.put("systems.kafka.samza.msg.serde", "json");
-    configs.put("systems.kafka.default.stream.replication.factor", "1");
-    configs.put("job.default.system", "kafka");
-
-    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
-    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
-
-    return configs;
-  }
-
-  private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
-    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
-
-    private transient List<Profile> received;
-    private transient ReadableTable table;
-
-    @Override
-    public void init(Context context) {
-      table = (ReadableTable) context.getTaskContext().getTable("t1");
-      this.received = new ArrayList<>();
-
-      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
-    }
-
-    @Override
-    public KV<Integer, Profile> apply(Profile profile) {
-      received.add(profile);
-      return new KV(profile.getMemberId(), profile);
-    }
-
-    public static MyMapFunction getMapFunctionByTask(String taskName) {
-      return taskToMapFunctionMap.get(taskName);
-    }
-  }
-
-  @Test
-  public void testWithLowLevelApi() throws Exception {
-
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
-    configs.put("streams.PageView.partitionCount", String.valueOf(4));
-    configs.put("task.inputs", "test.PageView");
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-  }
-
-  static public class MyTaskApplication implements TaskApplication {
-    @Override
-    public void describe(TaskApplicationDescriptor appDescriptor) {
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDescriptor
-          .withInputStream(pageViewISD)
-          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
-          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-    }
-  }
-
-  static public class MyStreamTask implements StreamTask, InitableTask {
-    private ReadWriteTable<Integer, PageView> pageViewTable;
-    @Override
-    public void init(Context context) throws Exception {
-      pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1");
-    }
-    @Override
-    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
-      PageView pv = (PageView) message.getMessage();
-      pageViewTable.put(pv.getMemberId(), pv);
-      PageView pv2 = pageViewTable.get(pv.getMemberId());
-      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
-      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
new file mode 100644
index 0000000..0303c26
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testSendTo() throws Exception {
+
+    int count = 10;
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    MyMapFunction mapFn = new MyMapFunction();
+
+    final StreamApplication app = appDesc -> {
+
+      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+
+      appDesc.getInputStream(isd)
+          .map(mapFn)
+          .sendTo(table);
+    };
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    for (int i = 0; i < partitionCount; i++) {
+      MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
+      assertEquals(count, mapFnCopy.received.size());
+      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+    }
+  }
+
+  static class StreamTableJoinApp implements StreamApplication {
+    static List<PageView> received = new LinkedList<>();
+    static List<EnrichedPageView> joined = new LinkedList<>();
+
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      Table<KV<Integer, Profile>> table = appDesc.getTable(
+          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+      appDesc.getInputStream(profileISD)
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(table);
+
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.getInputStream(pageViewISD)
+          .map(pv -> {
+              received.add(pv);
+              return pv;
+            })
+          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sink((m, collector, coordinator) -> joined.add(m));
+    }
+  }
+
+  @Test
+  public void testStreamTableJoin() throws Exception {
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.samza.bootstrap", "true");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
+    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
+    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+  }
+
+  static class DualStreamTableJoinApp implements StreamApplication {
+    static List<Profile> sentToProfileTable1 = new LinkedList<>();
+    static List<Profile> sentToProfileTable2 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+    static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
+      KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
+
+      PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
+      PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
+
+      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+      MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
+      MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
+
+      profileStream1
+          .map(m -> {
+              sentToProfileTable1.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+      profileStream2
+          .map(m -> {
+              sentToProfileTable2.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+
+      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+      MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
+      MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
+
+      pageViewStream1
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+          .join(profileTable, joinFn1)
+          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+      pageViewStream2
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+          .join(profileTable, joinFn2)
+          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+    }
+  }
+
+  @Test
+  public void testDualStreamTableJoin() throws Exception {
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.Profile1.samza.system", "test");
+    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile1.samza.bootstrap", "true");
+    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile2.samza.system", "test");
+    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile2.samza.bootstrap", "true");
+    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView1.samza.system", "test");
+    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView2.samza.system", "test");
+    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
+
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
+    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
+    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
+    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+  }
+
+  static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
+
+    configs.put(JobConfig.JOB_NAME(), "test-table-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    // For intermediate streams
+    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
+    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
+
+    return configs;
+  }
+
+  private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
+
+    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+
+    private transient List<Profile> received;
+    private transient ReadWriteTable table;
+
+    @Override
+    public void init(Context context) {
+      table = context.getTaskContext().getTable("t1");
+      this.received = new ArrayList<>();
+
+      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+    }
+
+    @Override
+    public KV<Integer, Profile> apply(Profile profile) {
+      received.add(profile);
+      return new KV(profile.getMemberId(), profile);
+    }
+
+    public static MyMapFunction getMapFunctionByTask(String taskName) {
+      return taskToMapFunctionMap.get(taskName);
+    }
+  }
+
+  @Test
+  public void testWithLowLevelApi() throws Exception {
+
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+    configs.put("streams.PageView.partitionCount", String.valueOf(4));
+    configs.put("task.inputs", "test.PageView");
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+  }
+
+  static public class MyTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDescriptor
+          .withInputStream(pageViewISD)
+          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
+          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+    }
+  }
+
+  static public class MyStreamTask implements StreamTask, InitableTask {
+    private ReadWriteTable<Integer, PageView> pageViewTable;
+    @Override
+    public void init(Context context) throws Exception {
+      pageViewTable = context.getTaskContext().getTable("t1");
+    }
+    @Override
+    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+      PageView pv = (PageView) message.getMessage();
+      pageViewTable.put(pv.getMemberId(), pv);
+      PageView pv2 = pageViewTable.get(pv.getMemberId());
+      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
deleted file mode 100644
index 3de8300..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import com.google.common.cache.CacheBuilder;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
-import org.apache.samza.table.remote.RemoteReadableTable;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.TableRateLimiter;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-import org.apache.samza.util.RateLimiter;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
-import static org.apache.samza.test.table.TestTableData.generateProfiles;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.withSettings;
-
-
-public class TestRemoteTable extends AbstractIntegrationTestHarness {
-
-  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
-
-  static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
-    private final String serializedProfiles;
-    private transient Map<Integer, Profile> profileMap;
-
-    private InMemoryReadFunction(String profiles) {
-      this.serializedProfiles = profiles;
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
-      this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
-    }
-
-    @Override
-    public CompletableFuture<Profile> getAsync(Integer key) {
-      return CompletableFuture.completedFuture(profileMap.get(key));
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-
-    static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
-      return new InMemoryReadFunction(serializedProfiles);
-    }
-  }
-
-  static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
-    private transient List<EnrichedPageView> records;
-    private String testName;
-
-    public InMemoryWriteFunction(String testName) {
-      this.testName = testName;
-    }
-
-    // Verify serializable functionality
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-
-      // Write to the global list for verification
-      records = writtenRecords.get(testName);
-    }
-
-    @Override
-    public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
-      records.add(record);
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> deleteAsync(Integer key) {
-      records.remove(key);
-      return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
-    CachingTableDescriptor<K, V> cachingDesc;
-    if (defaultCache) {
-      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
-      cachingDesc.withReadTtl(Duration.ofMinutes(5));
-      cachingDesc.withWriteTtl(Duration.ofMinutes(5));
-    } else {
-      GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
-      guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
-      cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
-    }
-
-    return appDesc.getTable(cachingDesc);
-  }
-
-  static class MyReadFunction implements TableReadFunction {
-    @Override
-    public CompletableFuture getAsync(Object key) {
-      return null;
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
-    final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
-
-    writtenRecords.put(testName, new ArrayList<>());
-
-    int count = 10;
-    PageView[] pageViews = generatePageViews(count);
-    String profiles = Base64Serializer.serialize(generateProfiles(count));
-
-    int partitionCount = 4;
-    Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
-    final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
-    final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
-    final StreamApplication app = appDesc -> {
-      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
-      inputTableDesc
-          .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
-          .withRateLimiter(readRateLimiter, null, null);
-
-      // dummy reader
-      TableReadFunction readFn = new MyReadFunction();
-
-      RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
-      outputTableDesc
-          .withReadFunction(readFn)
-          .withWriteFunction(writer)
-          .withRateLimiter(writeRateLimiter, null, null);
-
-      Table<KV<Integer, EnrichedPageView>> outputTable = withCache
-          ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
-          : appDesc.getTable(outputTableDesc);
-
-      Table<KV<Integer, Profile>> inputTable = withCache
-          ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
-          : appDesc.getTable(inputTableDesc);
-
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.getInputStream(isd)
-          .map(pv -> new KV<>(pv.getMemberId(), pv))
-          .join(inputTable, new PageViewToProfileJoinFunction())
-          .map(m -> new KV(m.getMemberId(), m))
-          .sendTo(outputTable);
-    };
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    int numExpected = count * partitionCount;
-    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
-    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTable() throws Exception {
-    doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTableWithCache() throws Exception {
-    doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
-  }
-
-  @Test
-  public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
-    doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
-  }
-
-  private Context createMockContext() {
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
-    doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
-    Context context = new MockContext();
-    doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
-    doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
-    return context;
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testCatchReaderException() {
-    TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
-    CompletableFuture<String> future = new CompletableFuture<>();
-    future.completeExceptionally(new RuntimeException("Expected test exception"));
-    doReturn(future).when(reader).getAsync(anyString());
-    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
-        "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
-    table.init(createMockContext());
-    table.get("abc");
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testCatchWriterException() {
-    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
-    TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
-    CompletableFuture<String> future = new CompletableFuture<>();
-    future.completeExceptionally(new RuntimeException("Expected test exception"));
-    doReturn(future).when(writer).putAsync(anyString(), any());
-    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>(
-        "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
-    table.init(createMockContext());
-    table.put("abc", "efg");
-  }
-}