You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/01/25 00:26:02 UTC

samza git commit: SAMZA-2085: Add table serde generation

Repository: samza
Updated Branches:
  refs/heads/master 8f338a8d3 -> 1b5fec1d5


SAMZA-2085: Add table serde generation

Currently the serde configuration generation is consolidated to JobNodeConfigurationGenerator, which runs during planning stage, it is not executed when tables are dynamically generated at earlier stage e.g. in config rewriter. In such situations, we would need to generated table serdes separately.

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek M<pm...@linkedin.com>

Closes #893 from weisong44/SAMZA-2085


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1b5fec1d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1b5fec1d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1b5fec1d

Branch: refs/heads/master
Commit: 1b5fec1d594e54d9d1e21833d281bf7bbb24ddc8
Parents: 8f338a8
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Jan 24 16:25:53 2019 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Jan 24 16:25:53 2019 -0800

----------------------------------------------------------------------
 .../samza/table/TableConfigGenerator.java       |  66 +++++++-
 .../samza/table/TestTableConfigGenerator.java   |  70 ++++++++
 .../test/table/TestLocalTableEndToEnd.java      |  50 ------
 ...estLocalTableWithConfigRewriterEndToEnd.java |  92 +++++++++++
 .../TestLocalTableWithLowLevelApiEndToEnd.java  |  95 +++++++++++
 .../table/TestLocalTableWithSideInputs.java     | 165 -------------------
 .../TestLocalTableWithSideInputsEndToEnd.java   | 165 +++++++++++++++++++
 7 files changed, 487 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 05cfacd..5d41ad6 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -19,14 +19,25 @@
 
 package org.apache.samza.table;
 
+import java.util.Base64;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Helper class to generate table configs.
  */
@@ -34,11 +45,64 @@ public class TableConfigGenerator {
 
   private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
 
-  static public Map<String, String> generate(Config jobConfig, List<TableDescriptor> tableDescriptors) {
+  /**
+   * Generate configuration for provided tables
+   *
+   * @param jobConfig existing job config
+   * @param tableDescriptors table descriptors, for which configuration to be generated
+   * @return table configuration
+   */
+  public static Map<String, String> generate(Config jobConfig, List<TableDescriptor> tableDescriptors) {
     Map<String, String> tableConfig = new HashMap<>();
     tableDescriptors.forEach(tableDescriptor -> tableConfig.putAll(tableDescriptor.toConfig(jobConfig)));
     LOG.info("TableConfigGenerator has generated configs {}", tableConfig);
     return tableConfig;
   }
 
+  /**
+   * Generate serde configuration for provided tables
+   *
+   * @param tableDescriptors table descriptors, for which serde configuration to be generated
+   * @return serde configuration for tables
+   */
+  public static Map<String, String> generateSerdeConfig(List<TableDescriptor> tableDescriptors) {
+
+    Map<String, String> serdeConfigs = new HashMap<>();
+
+    // Collect key and msg serde instances for all the tables
+    Map<String, Serde> tableKeySerdes = new HashMap<>();
+    Map<String, Serde> tableValueSerdes = new HashMap<>();
+    HashSet<Serde> serdes = new HashSet<>();
+    tableDescriptors.stream()
+        .filter(d -> d instanceof LocalTableDescriptor)
+        .forEach(d -> {
+            LocalTableDescriptor ld = (LocalTableDescriptor) d;
+            tableKeySerdes.put(ld.getTableId(), ld.getSerde().getKeySerde());
+            tableValueSerdes.put(ld.getTableId(), ld.getSerde().getValueSerde());
+          });
+    serdes.addAll(tableKeySerdes.values());
+    serdes.addAll(tableValueSerdes.values());
+
+    // Generate serde names
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Base64.Encoder base64Encoder = Base64.getEncoder();
+    Map<Serde, String> serdeUUIDs = new HashMap<>();
+    serdes.forEach(serde -> {
+        String serdeName = serdeUUIDs.computeIfAbsent(serde,
+            s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
+        serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName),
+            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+      });
+
+    // Set key and msg serdes for tables to the serde names generated above
+    tableKeySerdes.forEach((tableId, serde) -> {
+        String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
+        serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+    tableValueSerdes.forEach((tableId, serde) -> {
+        String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
+        serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
+    return serdeConfigs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-core/src/test/java/org/apache/samza/table/TestTableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableConfigGenerator.java b/samza-core/src/test/java/org/apache/samza/table/TestTableConfigGenerator.java
new file mode 100644
index 0000000..6e10a62
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableConfigGenerator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.descriptors.LocalTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+
+public class TestTableConfigGenerator {
+  @Test
+  public void testWithSerdes() {
+    List<TableDescriptor> descriptors = Arrays.asList(
+        new MockLocalTableDescriptor("t1", KVSerde.of(new StringSerde(), new IntegerSerde())),
+        new MockLocalTableDescriptor("t2", KVSerde.of(new StringSerde(), new IntegerSerde()))
+    );
+    Config jobConfig = new MapConfig(TableConfigGenerator.generateSerdeConfig(descriptors));
+    JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig);
+    assertNotNull(javaTableConfig.getKeySerde("t1"));
+    assertNotNull(javaTableConfig.getMsgSerde("t1"));
+    assertNotNull(javaTableConfig.getKeySerde("t2"));
+    assertNotNull(javaTableConfig.getMsgSerde("t2"));
+
+    MapConfig tableConfig = new MapConfig(TableConfigGenerator.generate(jobConfig, descriptors));
+    javaTableConfig = new JavaTableConfig(tableConfig);
+    assertNotNull(javaTableConfig.getTableProviderFactory("t1"));
+    assertNotNull(javaTableConfig.getTableProviderFactory("t2"));
+  }
+
+  public static class MockLocalTableDescriptor<K, V> extends LocalTableDescriptor<K, V, MockLocalTableDescriptor<K, V>> {
+
+    public MockLocalTableDescriptor(String tableId, KVSerde<K, V> serde) {
+      super(tableId, serde);
+    }
+
+    public String getProviderFactoryClassName() {
+      return "some.class";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index 0303c26..e2b2ad0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -26,9 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -47,14 +45,8 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
@@ -316,46 +308,4 @@ public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
     }
   }
 
-  @Test
-  public void testWithLowLevelApi() throws Exception {
-
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
-    configs.put("streams.PageView.partitionCount", String.valueOf(4));
-    configs.put("task.inputs", "test.PageView");
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-  }
-
-  static public class MyTaskApplication implements TaskApplication {
-    @Override
-    public void describe(TaskApplicationDescriptor appDescriptor) {
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDescriptor
-          .withInputStream(pageViewISD)
-          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
-          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-    }
-  }
-
-  static public class MyStreamTask implements StreamTask, InitableTask {
-    private ReadWriteTable<Integer, PageView> pageViewTable;
-    @Override
-    public void init(Context context) throws Exception {
-      pageViewTable = context.getTaskContext().getTable("t1");
-    }
-    @Override
-    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
-      PageView pv = (PageView) message.getMessage();
-      pageViewTable.put(pv.getMemberId(), pv);
-      PageView pv2 = pageViewTable.get(pv.getMemberId());
-      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
-      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
new file mode 100644
index 0000000..126edc8
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithConfigRewriterEndToEnd.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigRewriter;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
+import static org.apache.samza.test.table.TestLocalTableWithLowLevelApiEndToEnd.MyStreamTask;
+
+
+public class TestLocalTableWithConfigRewriterEndToEnd extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testWithConfigRewriter() throws Exception {
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+    configs.put("streams.PageView.partitionCount", String.valueOf(4));
+    configs.put("task.inputs", "test.PageView");
+    configs.put("job.config.rewriter.my-rewriter.class", MyConfigRewriter.class.getName());
+    configs.put("job.config.rewriters", "my-rewriter");
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+  }
+
+  static public class MyConfigRewriter implements ConfigRewriter {
+    @Override
+    public Config rewrite(String name, Config config) {
+      List<TableDescriptor> descriptors = Arrays.asList(
+          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde())),
+          new InMemoryTableDescriptor("t2", KVSerde.of(new IntegerSerde(), new StringSerde())));
+      Map<String, String> serdeConfig = TableConfigGenerator.generateSerdeConfig(descriptors);
+      Map<String, String> tableConfig = TableConfigGenerator.generate(new MapConfig(config, serdeConfig), descriptors);
+      return new MapConfig(config, serdeConfig, tableConfig);
+    }
+  }
+
+  static public class MyTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDescriptor
+          .withInputStream(pageViewISD)
+          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
new file mode 100644
index 0000000..6cefe2b
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithLowLevelApiEndToEnd.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.Map;
+
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestLocalTableEndToEnd.getBaseJobConfig;
+
+
+public class TestLocalTableWithLowLevelApiEndToEnd extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testTableWithLowLevelApi() throws Exception {
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+    configs.put("streams.PageView.partitionCount", String.valueOf(4));
+    configs.put("task.inputs", "test.PageView");
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+    executeRun(runner, config);
+    runner.waitForFinish();
+  }
+
+  static public class MyTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDescriptor
+          .withInputStream(pageViewISD)
+          .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde())))
+          .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+    }
+  }
+
+  static public class MyStreamTask implements StreamTask, InitableTask {
+    private ReadWriteTable<Integer, TestTableData.PageView> pageViewTable;
+    @Override
+    public void init(Context context) {
+      pageViewTable = context.getTaskContext().getTable("t1");
+    }
+    @Override
+    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+      TestTableData.PageView pv = (TestTableData.PageView) message.getMessage();
+      pageViewTable.put(pv.getMemberId(), pv);
+      TestTableData.PageView pv2 = pageViewTable.get(pv.getMemberId());
+      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
deleted file mode 100644
index ce55dd0..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import com.google.common.collect.ImmutableList;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
-import org.apache.samza.table.Table;
-import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
-import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness {
-  private static final String PAGEVIEW_STREAM = "pageview";
-  private static final String PROFILE_STREAM = "profile";
-  private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
-
-  @Test
-  public void testJoinWithSideInputsTable() {
-    runTest(
-        "test",
-        new PageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(10)),
-        Arrays.asList(TestTableData.generateProfiles(10)));
-  }
-
-  @Test
-  public void testJoinWithDurableSideInputTable() {
-    runTest(
-        "test",
-        new DurablePageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(5)),
-        Arrays.asList(TestTableData.generateProfiles(5)));
-  }
-
-  private void runTest(String systemName, StreamApplication app, List<PageView> pageViews,
-      List<Profile> profiles) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
-
-    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
-
-    InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
-        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
-
-    InMemoryInputDescriptor<Profile> profileStreamDesc = isd
-        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
-
-    InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
-        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
-
-    TestRunner
-        .of(app)
-        .addInputStream(pageViewStreamDesc, pageViews)
-        .addInputStream(profileStreamDesc, profiles)
-        .addOutputStream(outputStreamDesc, 1)
-        .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(100000));
-
-    try {
-      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-      List<EnrichedPageView> results = result.values().stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
-
-      List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
-          .flatMap(pv -> profiles.stream()
-              .filter(profile -> pv.memberId == profile.memberId)
-              .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
-          .collect(Collectors.toList());
-
-      boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
-      assertEquals("Mismatch between the expected and actual join count", expectedEnrichedPageviews.size(), results.size());
-      assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-    } catch (SamzaException e) {
-      e.printStackTrace();
-    }
-  }
-
-  static class PageViewProfileJoin implements StreamApplication {
-    static final String PROFILE_TABLE = "profile-table";
-
-    @Override
-    public void describe(StreamApplicationDescriptor appDescriptor) {
-      Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
-      KafkaSystemDescriptor sd =
-          new KafkaSystemDescriptor("test");
-      appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
-          .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
-          .join(table, new PageViewToProfileJoinFunction())
-          .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
-    }
-
-    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
-          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
-          .withSideInputsProcessor((msg, store) -> {
-              Profile profile = (Profile) msg.getMessage();
-              int key = profile.getMemberId();
-              return ImmutableList.of(new Entry<>(key, profile));
-            });
-    }
-  }
-
-  static class DurablePageViewProfileJoin extends PageViewProfileJoin {
-    @Override
-    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
-          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
-          .withSideInputsProcessor((msg, store) -> {
-              TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
-              int key = profile.getMemberId();
-              return ImmutableList.of(new Entry<>(key, profile));
-            });
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1b5fec1d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
new file mode 100644
index 0000000..74e2105
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.table.Table;
+import org.apache.samza.test.framework.TestRunner;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestLocalTableWithSideInputsEndToEnd extends AbstractIntegrationTestHarness {
+  private static final String PAGEVIEW_STREAM = "pageview";
+  private static final String PROFILE_STREAM = "profile";
+  private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+
+  @Test
+  public void testJoinWithSideInputsTable() {
+    runTest(
+        "test",
+        new PageViewProfileJoin(),
+        Arrays.asList(TestTableData.generatePageViews(10)),
+        Arrays.asList(TestTableData.generateProfiles(10)));
+  }
+
+  @Test
+  public void testJoinWithDurableSideInputTable() {
+    runTest(
+        "test",
+        new DurablePageViewProfileJoin(),
+        Arrays.asList(TestTableData.generatePageViews(5)),
+        Arrays.asList(TestTableData.generateProfiles(5)));
+  }
+
+  private void runTest(String systemName, StreamApplication app, List<PageView> pageViews,
+      List<Profile> profiles) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
+
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
+
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
+
+    InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
+        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
+
+    TestRunner
+        .of(app)
+        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(profileStreamDesc, profiles)
+        .addOutputStream(outputStreamDesc, 1)
+        .addConfig(new MapConfig(configs))
+        .run(Duration.ofMillis(100000));
+
+    try {
+      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
+      List<EnrichedPageView> results = result.values().stream()
+          .flatMap(List::stream)
+          .collect(Collectors.toList());
+
+      List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
+          .flatMap(pv -> profiles.stream()
+              .filter(profile -> pv.memberId == profile.memberId)
+              .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
+          .collect(Collectors.toList());
+
+      boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
+      assertEquals("Mismatch between the expected and actual join count", expectedEnrichedPageviews.size(), results.size());
+      assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
+    } catch (SamzaException e) {
+      e.printStackTrace();
+    }
+  }
+
+  static class PageViewProfileJoin implements StreamApplication {
+    static final String PROFILE_TABLE = "profile-table";
+
+    @Override
+    public void describe(StreamApplicationDescriptor appDescriptor) {
+      Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
+      KafkaSystemDescriptor sd =
+          new KafkaSystemDescriptor("test");
+      appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
+          .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
+    }
+
+    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
+      return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+              Profile profile = (Profile) msg.getMessage();
+              int key = profile.getMemberId();
+              return ImmutableList.of(new Entry<>(key, profile));
+            });
+    }
+  }
+
+  static class DurablePageViewProfileJoin extends PageViewProfileJoin {
+    @Override
+    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
+      return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+              TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
+              int key = profile.getMemberId();
+              return ImmutableList.of(new Entry<>(key, profile));
+            });
+    }
+  }
+}
\ No newline at end of file