You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/04 22:37:53 UTC

[1/2] samza git commit: SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs.

Repository: samza
Updated Branches:
  refs/heads/master ad80cf9f1 -> a671288e1


http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
index 62304f3..7677826 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -54,30 +54,30 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from time-range [1,2) should return one entry
     values = readStore(timeSeriesStore, "hello", 1L, 2L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
 
     // read from time-range [2,3) should return two entries
     values = readStore(timeSeriesStore, "hello", 2L, 3L);
-    Assert.assertEquals(values.size(), 2);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
-    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
+    Assert.assertEquals(2L, values.get(0).getTimestamp());
 
     // read from time-range [0,3) should return three entries
     values = readStore(timeSeriesStore, "hello", 0L, 3L);
-    Assert.assertEquals(values.size(), 3);
+    Assert.assertEquals(3, values.size());
 
     // read from time-range [2,999999) should return two entries
     values = readStore(timeSeriesStore, "hello", 2L, 999999L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     // read from time-range [3,4) should return no entries
     values = readStore(timeSeriesStore, "hello", 3L, 4L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -87,11 +87,11 @@ public class TestTimeSeriesStoreImpl {
 
     // read from a non-existent key
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from an existing key but out of range timestamp
     values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -106,21 +106,21 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range [0,2) should return 100 entries
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L);
-    Assert.assertEquals(values.size(), 100);
+    Assert.assertEquals(100, values.size());
     values.forEach(timeSeriesValue -> {
-        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1");
+        Assert.assertEquals("world-1", new String(timeSeriesValue.getValue()));
       });
 
     // read from time-range [2,4) should return 100 entries
     values = readStore(timeSeriesStore, "hello", 2L, 4L);
-    Assert.assertEquals(values.size(), 100);
+    Assert.assertEquals(100, values.size());
     values.forEach(timeSeriesValue -> {
-        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2");
+        Assert.assertEquals("world-2", new String(timeSeriesValue.getValue()));
       });
 
     // read all entries in the store
     values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 200);
+    Assert.assertEquals(200, values.size());
   }
 
   @Test
@@ -135,30 +135,30 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from time-range [1,2) should return one entry
     values = readStore(timeSeriesStore, "hello", 1L, 2L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
 
     // read from time-range [2,3) should return the most recent entry
     values = readStore(timeSeriesStore, "hello", 2L, 3L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-2");
-    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-2", new String(values.get(0).getValue()));
+    Assert.assertEquals(2L, values.get(0).getTimestamp());
 
     // read from time-range [0,3) should return two entries
     values = readStore(timeSeriesStore, "hello", 0L, 3L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     // read from time-range [2,999999) should return one entry
     values = readStore(timeSeriesStore, "hello", 2L, 999999L);
-    Assert.assertEquals(values.size(), 1);
+    Assert.assertEquals(1, values.size());
 
     // read from time-range [3,4) should return no entries
     values = readStore(timeSeriesStore, "hello", 3L, 4L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -172,11 +172,11 @@ public class TestTimeSeriesStoreImpl {
     timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
 
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     timeSeriesStore.remove("hello", 0L, 3L);
     values = readStore(timeSeriesStore, "hello", 1L, 3L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
new file mode 100644
index 0000000..40015ec
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.IntegerSerde;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestTimestampedValueSerde {
+
+  @Test
+  public void testEmptyValueDeserialization() {
+    byte[] bytesWithNoValue = new byte[8];
+    ByteBuffer.wrap(bytesWithNoValue).putLong(1234L);
+    TimestampedValueSerde<byte[]> timestampedValueSerde = new TimestampedValueSerde<>(new ByteSerde());
+    TimestampedValue<byte[]> timestampedValue = timestampedValueSerde.fromBytes(bytesWithNoValue);
+    assertEquals(1234L, timestampedValue.getTimestamp());
+    assertEquals(0, timestampedValue.getValue().length);
+  }
+
+  @Test
+  public void testEmptyValueSerialization() {
+    byte[] expectedBytes = new byte[8];
+    ByteBuffer.wrap(expectedBytes).putLong(1234L);
+
+    TimestampedValueSerde<Integer> timestampedValueSerde = new TimestampedValueSerde<>(new IntegerSerde());
+    TimestampedValue<Integer> timestampedValue = new TimestampedValue<>(null, 1234L);
+    assertTrue(Arrays.equals(expectedBytes, timestampedValueSerde.toBytes(timestampedValue)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
deleted file mode 100644
index 2a8f039..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-
-class PageView {
-  private String userId;
-  private String country;
-  private String url;
-
-  public String getUserId() {
-    return userId;
-  }
-
-  public String getCountry() {
-    return country;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  public void setUserId(String userId) {
-    this.userId = userId;
-  }
-
-  public void setCountry(String country) {
-    this.country = country;
-  }
-
-  public void setUrl(String url) {
-    this.url = url;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
new file mode 100644
index 0000000..517d81f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.operator;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.AdClick;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.test.operator.data.UserPageAdClick;
+
+import java.time.Duration;
+
+/**
+ * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
+ */
+public class RepartitionJoinWindowApp implements StreamApplication {
+  static final String PAGE_VIEWS = "page-views";
+  static final String AD_CLICKS = "ad-clicks";
+  static final String OUTPUT_TOPIC = "user-ad-click-counts";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new JsonSerdeV2<>(PageView.class));
+    MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new JsonSerdeV2<>(AdClick.class));
+    OutputStream<KV<String, String>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
+
+    MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
+        .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
+        .map(KV::getValue);
+
+    MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
+        .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)))
+        .map(KV::getValue);
+
+    MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId
+        .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
+            new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class),
+            Duration.ofMinutes(1));
+
+    userPageAdClicks
+        .partitionBy(UserPageAdClick::getUserId, upac -> upac,
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)))
+        .map(KV::getValue)
+        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3)))
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
+        .sendTo(outputStream);
+  }
+
+  private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
+    @Override
+    public UserPageAdClick apply(PageView pv, AdClick ac) {
+      return new UserPageAdClick(pv.getUserId(), pv.getPageId(), ac.getAdId());
+    }
+
+    @Override
+    public String getFirstKey(PageView pv) {
+      return pv.getViewId();
+    }
+
+    @Override
+    public String getSecondKey(AdClick ac) {
+      return ac.getViewId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
deleted file mode 100644
index 261b954..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-
-import java.time.Duration;
-
-/**
- * A {@link StreamApplication} that demonstrates a partitionBy followed by a windowed count.
- */
-public class RepartitionWindowApp implements StreamApplication {
-  static final String INPUT_TOPIC = "page-views";
-  static final String OUTPUT_TOPIC = "page-view-counts";
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
-
-    OutputStream<KV<String, String>> outputStream =
-        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
-
-    pageViews
-        .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
-        .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3)))
-        .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
-        .sendTo(outputStream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 4c83960..974cafc 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
 
 import java.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index 9bb66ad..db46982 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -213,7 +213,6 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
    * @param overriddenConfigs configs to override
    */
   public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) {
-
     Map<String, String> configs = new HashMap<>();
     configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
     configs.put("job.name", appName);
@@ -231,6 +230,17 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
     configs.put("job.coordinator.replication.factor", "1");
     configs.put("task.window.ms", "1000");
 
+    // This is to prevent tests from taking a long time to stop after they're done. The issue is that
+    // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
+    // The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any
+    // store changelogs, which then get stuck trying to produce to the stopped Kafka server.
+    // Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when
+    // kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create
+    // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK
+    // since the test method has already executed by the time the shutdown hook is called. The side effect is
+    // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run.
+    configs.put("task.shutdown.ms", "1");
+
     if (overriddenConfigs != null) {
       configs.putAll(overriddenConfigs);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
new file mode 100644
index 0000000..117f97b
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS;
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC;
+
+/**
+ * Test driver for {@link RepartitionJoinWindowApp}.
+ */
+public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTestHarness {
+  private static final String APP_NAME = "UserPageAdClickCounter";
+
+  @Test
+  public void testRepartitionJoinWindowApp() throws Exception {
+    // create topics
+    createTopic(PAGE_VIEWS, 2);
+    createTopic(AD_CLICKS, 2);
+    createTopic(OUTPUT_TOPIC, 1);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1, a2)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1, a2, a4)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
+    produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v1\",\"adId\":\"a2\"}");
+    produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
+    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
+    produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v3\",\"adId\":\"a2\"}");
+    produceMessage(AD_CLICKS, 1, "a4", "{\"viewId\":\"v3\",\"adId\":\"a4\"}");
+    produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+
+    // run the application
+    RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
+    runApplication(app, APP_NAME, null);
+
+    // consume and validate result
+    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+    Assert.assertEquals(2, messages.size());
+
+    for (ConsumerRecord<String, String> message : messages) {
+      String key = message.key();
+      String value = message.value();
+      Assert.assertTrue(key.equals("u1") || key.equals("u2"));
+      if ("u1".equals(key)) {
+        Assert.assertEquals("3", value);
+      } else {
+        Assert.assertEquals("4", value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
deleted file mode 100644
index 3745541..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC;
-import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC;
-
-/**
- * Test driver for {@link RepartitionWindowApp}.
- */
-public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
-  private static final String APP_NAME = "RepartitionedSessionizer";
-
-  @Test
-  public void testRepartitionedSessionWindowCounter() throws Exception {
-    // create topics
-    createTopic(INPUT_TOPIC, 3);
-    createTopic(OUTPUT_TOPIC, 1);
-
-    // produce messages to different partitions.
-    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"5.com\"}");
-    produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", \"country\":\"china\",\"url\":\"4.com\"}");
-    produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"1.com\"}");
-    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"2.com\"}");
-    produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"3.com\"}");
-
-    // run the application
-    RepartitionWindowApp app = new RepartitionWindowApp();
-    runApplication(app, APP_NAME, null);
-
-    // consume and validate result
-    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
-    Assert.assertEquals(messages.size(), 2);
-
-    for (ConsumerRecord<String, String> message : messages) {
-      String key = message.key();
-      String value = message.value();
-      // Assert that there are 4 messages for userId1 and 1 message for userId2.
-      Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
-      if ("userId1".equals(key)) {
-        Assert.assertEquals("4", value);
-      } else {
-        Assert.assertEquals("1", value);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 3f3e615..151c9d1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
 
 import java.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
new file mode 100644
index 0000000..ee699ae
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class AdClick {
+  private String viewId;
+  private String adId;
+
+  public String getViewId() {
+    return viewId;
+  }
+
+  public void setViewId(String viewId) {
+    this.viewId = viewId;
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public void setAdId(String adId) {
+    this.adId = adId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
new file mode 100644
index 0000000..d2cebf9
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class PageView {
+  private String viewId;
+  private String pageId;
+  private String userId;
+
+  public String getViewId() {
+    return viewId;
+  }
+
+  public void setViewId(String viewId) {
+    this.viewId = viewId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+  public void setPageId(String pageId) {
+    this.pageId = pageId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
new file mode 100644
index 0000000..e5f7b53
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class UserPageAdClick {
+  private String userId;
+  private String pageId;
+  private String adId;
+
+  @JsonCreator
+  public UserPageAdClick(
+      @JsonProperty("userId") String userId,
+      @JsonProperty("pageId") String pageId,
+      @JsonProperty("adId") String adId) {
+    this.userId = userId;
+    this.pageId = pageId;
+    this.adId = adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+  public void setPageId(String pageId) {
+    this.pageId = pageId;
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public void setAdId(String adId) {
+    this.adId = adId;
+  }
+}


[2/2] samza git commit: SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs.

Posted by pm...@apache.org.
SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs.

Provided join operator access to durable state stores.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish Venkatraman <ja...@apache.org>

Closes #309 from prateekm/operator-store-wiring


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a671288e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a671288e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a671288e

Branch: refs/heads/master
Commit: a671288e18dea3336fe1c625233bb7427ed8b5b2
Parents: ad80cf9
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 4 15:37:50 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 15:37:50 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../apache/samza/operators/MessageStream.java   |  13 ++-
 .../operators/functions/WatermarkFunction.java  |  31 +++---
 .../apache/samza/container/TaskContextImpl.java |   6 +-
 .../coordinator/DistributedLockWithState.java   |   4 +-
 .../org/apache/samza/execution/JobNode.java     |  63 ++++++++---
 .../samza/operators/MessageStreamImpl.java      |  16 +--
 .../functions/PartialJoinFunction.java          |  26 +----
 .../samza/operators/impl/OperatorImplGraph.java |  38 +++++--
 .../operators/impl/PartialJoinOperatorImpl.java |  72 ++++--------
 .../operators/impl/store/TimeSeriesStore.java   |   1 +
 .../operators/impl/store/TimestampedValue.java  |  16 +--
 .../impl/store/TimestampedValueSerde.java       |  55 +++++++++
 .../samza/operators/spec/JoinOperatorSpec.java  |  70 +++++++++---
 .../samza/operators/spec/OperatorSpecs.java     |  17 ++-
 .../operators/spec/StatefulOperatorSpec.java    |  37 +++++++
 .../samza/operators/spec/StoreDescriptor.java   |  81 ++++++++++++++
 .../samza/runtime/LocalApplicationRunner.java   |   2 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |   5 +-
 .../apache/samza/container/SamzaContainer.scala |   8 +-
 .../samza/example/OrderShipmentJoinExample.java |   4 +-
 .../samza/execution/TestExecutionPlanner.java   |  42 ++++---
 .../execution/TestJobGraphJsonGenerator.java    |  19 +++-
 .../org/apache/samza/execution/TestJobNode.java | 111 ++++++++++++++-----
 .../samza/operators/TestJoinOperator.java       |  12 +-
 .../samza/operators/TestMessageStreamImpl.java  |   3 +-
 .../operators/impl/TestOperatorImplGraph.java   |  31 ++++--
 .../impl/store/TestTimeSeriesStoreImpl.java     |  54 ++++-----
 .../impl/store/TestTimestampedValueSerde.java   |  52 +++++++++
 .../apache/samza/test/operator/PageView.java    |  50 ---------
 .../test/operator/RepartitionJoinWindowApp.java |  92 +++++++++++++++
 .../test/operator/RepartitionWindowApp.java     |  55 ---------
 .../samza/test/operator/SessionWindowApp.java   |   1 +
 ...StreamApplicationIntegrationTestHarness.java |  12 +-
 .../operator/TestRepartitionJoinWindowApp.java  |  81 ++++++++++++++
 .../test/operator/TestRepartitionWindowApp.java |  70 ------------
 .../samza/test/operator/TumblingWindowApp.java  |   1 +
 .../samza/test/operator/data/AdClick.java       |  41 +++++++
 .../samza/test/operator/data/PageView.java      |  50 +++++++++
 .../test/operator/data/UserPageAdClick.java     |  62 +++++++++++
 40 files changed, 974 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 7cbffe7..c2e9180 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,7 @@ docs/_site
 build
 **/bin
 samza-test/state
+state/
 docs/learn/documentation/*/api/javadocs
 docs/learn/documentation/*/rest/javadocs
 .DS_Store

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 2a1045d..c36fe1f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -128,13 +129,17 @@ public interface MessageStream<M> {
    * @param otherStream the other {@link MessageStream} to be joined with
    * @param joinFn the function to join messages from this and the other {@link MessageStream}
    * @param ttl the ttl for messages in each stream
+   * @param keySerde the serde for the join key
+   * @param messageSerde the serde for messages in this stream
+   * @param otherMessageSerde the serde for messages in the other stream
    * @param <K> the type of join key
-   * @param <JM> the type of messages in the other stream
-   * @param <OM> the type of messages resulting from the {@code joinFn}
+   * @param <OM> the type of messages in the other stream
+   * @param <JM> the type of messages resulting from the {@code joinFn}
    * @return the joined {@link MessageStream}
    */
-  <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
-      JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl);
+  <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);
 
   /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
index 9c4b9bf..3be293e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -20,14 +20,14 @@
 package org.apache.samza.operators.functions;
 
 /**
- * Allows user-specific handling of Watermark
+ * Allows handling of watermarks.
  */
 public interface WatermarkFunction {
 
   /**
    * Processes the input watermark coming from upstream operators.
-   * This allows user-defined watermark handling, such as trigger events
-   * or propagate it to downstream.
+   * This allows custom watermark handling, such as triggering events or propagating it downstream.
+   *
    * @param watermark input watermark
    */
   void processWatermark(long watermark);
@@ -35,24 +35,19 @@ public interface WatermarkFunction {
   /**
    * Returns the output watermark. This function will be invoked immediately after either
    * of the following events:
-   *
    * <ol>
-   *
-   * <li> Return of the transform function, e.g. {@link FlatMapFunction}.
-   *
-   * <li> Return of the processWatermark function.
-   *
+   *  <li> Return from the transform function, e.g. {@link FlatMapFunction}.
+   *  <li> Return from the {@link #processWatermark} function.
    * </ol>
+
+   * Note: If the transform function returns a collection of messages, the output watermark
+   * will be emitted after the output collection has been propagated to downstream operators.
+   * This might delay the watermark propagation, which will cause more buffering and might
+   * have a performance impact.
    *
-   *
-   *
-   * Note: If the transform function returns a collection of output, the output watermark
-   * will be emitted after the output collection is propagated to downstream operators. So
-   * it might delay the watermark propagation. The delay will cause more buffering and might
-   * have performance impact.
-   *
-   * @return output watermark, or null if the output watermark should not be updated. Samza
-   * guarantees that the same watermark value will be only emitted once.
+   * @return output watermark, or null if the output watermark should not be updated.
+   *         Samza guarantees that the same watermark value will only be emitted once.
    */
   Long getOutputWatermark();
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 7990d2b..aa622a3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.StorageEngine;
 import org.apache.samza.storage.TaskStorageManager;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.TaskContext;
@@ -79,9 +79,9 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public StorageEngine getStore(String storeName) {
+  public KeyValueStore getStore(String storeName) {
     if (storageManager != null) {
-      return storageManager.apply(storeName);
+      return (KeyValueStore) storageManager.apply(storeName);
     } else {
       LOG.warn("No store found for name: {}", storeName);
       return null;

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
index 0de7813..c8e9033 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
@@ -26,12 +26,12 @@ import java.util.concurrent.TimeoutException;
 public interface DistributedLockWithState {
 
   /**
-   * Trie to acquire the lock, but first check if the state flag is set. If it is set, return false.
+   * Try to acquire the lock, but first check if the state flag is set. If it is set, return false.
    * If the flag is not set, and lock is acquired - return true.
-   * Throw TimeOutException if could not acquire the lock.
    * @param timeout Duration of lock acquiring timeout.
    * @param unit Time Unit of the timeout defined above.
    * @return true if lock is acquired successfully, false if state is already set.
+   * @throws TimeoutException if could not acquire the lock.
    */
   boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 0368829..2e89292 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -40,6 +41,7 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StatefulOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.serializers.Serde;
@@ -129,6 +131,14 @@ public class JobNode {
       }
     }
 
+    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+        if (opSpec instanceof StatefulOperatorSpec) {
+          ((StatefulOperatorSpec) opSpec).getStoreDescriptors()
+              .forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+          // store key and message serdes are configured separately in #addSerdeConfigs
+        }
+      });
+
     configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
 
     // write input/output streams to configs
@@ -168,32 +178,42 @@ public class JobNode {
    *
    * @param configs the configs to add serialized serde instances and stream serde configs to
    */
-  protected void addSerdeConfigs(Map<String, String> configs) {
+  void addSerdeConfigs(Map<String, String> configs) {
     // collect all key and msg serde instances for streams
-    Map<String, Serde> keySerdes = new HashMap<>();
-    Map<String, Serde> msgSerdes = new HashMap<>();
+    Map<String, Serde> streamKeySerdes = new HashMap<>();
+    Map<String, Serde> streamMsgSerdes = new HashMap<>();
     Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
-        Serde keySerde = inputOperatorSpec.getKeySerde();
-        Serde valueSerde = inputOperatorSpec.getValueSerde();
-        keySerdes.put(streamId, keySerde);
-        msgSerdes.put(streamId, valueSerde);
+        streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
+        streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
       });
     Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
         OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
-        Serde keySerde = outputStream.getKeySerde();
-        Serde valueSerde = outputStream.getValueSerde();
-        keySerdes.put(streamId, keySerde);
-        msgSerdes.put(streamId, valueSerde);
+        streamKeySerdes.put(streamId, outputStream.getKeySerde());
+        streamMsgSerdes.put(streamId, outputStream.getValueSerde());
+      });
+
+    // collect all key and msg serde instances for stores
+    Map<String, Serde> storeKeySerdes = new HashMap<>();
+    Map<String, Serde> storeMsgSerdes = new HashMap<>();
+    streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+        if (opSpec instanceof StatefulOperatorSpec) {
+          ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
+              storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
+              storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
+            });
+        }
       });
 
-    // for each unique serde instance, generate a unique name and serialize to config
-    HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
-    serdes.addAll(msgSerdes.values());
+    // for each unique stream or store serde instance, generate a unique name and serialize to config
+    HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
+    serdes.addAll(streamMsgSerdes.values());
+    serdes.addAll(storeKeySerdes.values());
+    serdes.addAll(storeMsgSerdes.values());
     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
     Base64.Encoder base64Encoder = Base64.getEncoder();
     Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -205,17 +225,28 @@ public class JobNode {
       });
 
     // set key and msg serdes for streams to the serde names generated above
-    keySerdes.forEach((streamId, serde) -> {
+    streamKeySerdes.forEach((streamId, serde) -> {
         String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
         String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
         configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
       });
 
-    msgSerdes.forEach((streamId, serde) -> {
+    streamMsgSerdes.forEach((streamId, serde) -> {
         String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
         String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
         configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
       });
+
+    // set key and msg serdes for stores to the serde names generated above
+    storeKeySerdes.forEach((storeName, serde) -> {
+        String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    storeMsgSerdes.forEach((storeName, serde) -> {
+        String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
+        configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
+      });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 7b93a9e..8460ada 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -37,6 +37,7 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -112,15 +113,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   }
 
   @Override
-  public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
-      JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) {
-    OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec();
-    JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
-        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
-            (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId());
+  public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) {
+    OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+    JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
+        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
+            keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId());
 
     this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
-    otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
+    otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
 
     return new MessageStreamImpl<>(this.graph, joinOpSpec);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
index 9b7956a..5ede5e8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -18,21 +18,22 @@
  */
 package org.apache.samza.operators.functions;
 
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 
 /**
  * An internal function that maintains state and join logic for one side of a two-way join.
  */
-public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction {
+public interface PartialJoinFunction<K, M, OM, JM> extends InitableFunction, ClosableFunction {
 
   /**
    * Joins a message in this stream with a message from another stream.
    *
    * @param m  message from this input stream
-   * @param jm  message from the other input stream
+   * @param om  message from the other input stream
    * @return  the joined message in the output stream
    */
-  RM apply(M m, JM jm);
+  JM apply(M m, OM om);
 
   /**
    * Gets the key for the input message.
@@ -47,23 +48,6 @@ public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, Clo
    *
    * @return the key value store containing the state for this stream
    */
-  KeyValueStore<K, PartialJoinMessage<M>> getState();
+  KeyValueStore<K, TimestampedValue<M>> getState();
 
-  class PartialJoinMessage<M> {
-    private final M message;
-    private final long receivedTimeMs;
-
-    public PartialJoinMessage(M message, long receivedTimeMs) {
-      this.message = message;
-      this.receivedTimeMs = receivedTimeMs;
-    }
-
-    public M getMessage() {
-      return message;
-    }
-
-    public long getReceivedTimeMs() {
-      return receivedTimeMs;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 1f86975..808ddbf 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -37,7 +37,7 @@ import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
@@ -219,16 +219,17 @@ public class OperatorImplGraph {
 
   private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
     return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
-        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), createRightJoinFn(joinOpSpec)));
   }
 
-  private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {
+  private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinOperatorSpec joinOpSpec) {
     return new PartialJoinFunction<Object, Object, Object, Object>() {
-      private KeyValueStore<Object, PartialJoinMessage<Object>> leftStreamState = new InternalInMemoryStore<>();
+      private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+      private KeyValueStore<Object, TimestampedValue<Object>> leftStreamState;
 
       @Override
-      public Object apply(Object m, Object jm) {
-        return joinFn.apply(m, jm);
+      public Object apply(Object m, Object om) {
+        return joinFn.apply(m, om);
       }
 
       @Override
@@ -237,12 +238,15 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+      public KeyValueStore<Object, TimestampedValue<Object>> getState() {
         return leftStreamState;
       }
 
       @Override
       public void init(Config config, TaskContext context) {
+        String leftStoreName = joinOpSpec.getLeftOpName();
+        leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName);
+
         // user-defined joinFn should only be initialized once, so we do it only in left partial join function.
         joinFn.init(config, context);
       }
@@ -255,13 +259,14 @@ public class OperatorImplGraph {
     };
   }
 
-  private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinFunction joinFn) {
+  private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinOperatorSpec joinOpSpec) {
     return new PartialJoinFunction<Object, Object, Object, Object>() {
-      private KeyValueStore<Object, PartialJoinMessage<Object>> rightStreamState = new InternalInMemoryStore<>();
+      private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+      private KeyValueStore<Object, TimestampedValue<Object>> rightStreamState;
 
       @Override
-      public Object apply(Object m, Object jm) {
-        return joinFn.apply(jm, m);
+      public Object apply(Object m, Object om) {
+        return joinFn.apply(om, m);
       }
 
       @Override
@@ -270,7 +275,16 @@ public class OperatorImplGraph {
       }
 
       @Override
-      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+      public void init(Config config, TaskContext context) {
+        String rightStoreName = joinOpSpec.getRightOpName();
+        rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName);
+
+        // user-defined joinFn should only be initialized once,
+        // so we do it only in left partial join function and not here again.
+      }
+
+      @Override
+      public KeyValueStore<Object, TimestampedValue<Object>> getState() {
         return rightStreamState;
       }
     };

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index ad66962..e976a43 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -19,47 +19,39 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * Implementation of one side of a {@link JoinOperatorSpec} that buffers and joins its input messages of
- * type {@code M} with buffered input messages of type {@code JM} in the paired {@link PartialJoinOperatorImpl}.
+ * type {@code M} with buffered input messages of type {@code OM} in the paired {@link PartialJoinOperatorImpl}.
  *
  * @param <K> the type of join key
  * @param <M> the type of input messages on this side of the join
- * @param <JM> the type of input message on the other side of the join
- * @param <RM> the type of join result
+ * @param <OM> the type of input message on the other side of the join
+ * @param <JM> the type of join result
  */
-class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
+class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
 
-  private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec;
+  private final JoinOperatorSpec<K, M, OM, JM> joinOpSpec;
   private final boolean isLeftSide; // whether this operator impl is for the left side of the join
-  private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
-  private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
+  private final PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn;
+  private final PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn;
   private final long ttlMs;
   private final Clock clock;
 
-  private Counter keysRemoved;
-
-  PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean isLeftSide,
-      PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
-      PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
+  PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean isLeftSide,
+      PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn,
+      PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn,
       Config config, TaskContext context, Clock clock) {
     this.joinOpSpec = joinOpSpec;
     this.isLeftSide = isLeftSide;
@@ -71,54 +63,29 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
-    keysRemoved = context.getMetricsRegistry()
-        .newCounter(OperatorImpl.class.getName(), getOperatorName() + "-keys-removed");
     this.thisPartialJoinFn.init(config, context);
   }
 
   @Override
-  public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+  public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
     K key = thisPartialJoinFn.getKey(message);
-    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
-    PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
+    thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
+    TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key);
     long now = clock.currentTimeMillis();
-    if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
-      RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
+    if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
+      JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
       return Collections.singletonList(joinResult);
     }
     return Collections.emptyList();
   }
 
   @Override
-  public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    long now = clock.currentTimeMillis();
-
-    KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
-    KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all();
-    List<K> keysToRemove = new ArrayList<>();
-
-    while (iterator.hasNext()) {
-      Entry<K, PartialJoinMessage<M>> entry = iterator.next();
-      if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
-        keysToRemove.add(entry.getKey());
-      } else {
-        break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order
-      }
-    }
-
-    iterator.close();
-    thisState.deleteAll(keysToRemove);
-    keysRemoved.inc(keysToRemove.size());
-    return Collections.emptyList();
-  }
-
-  @Override
   protected void handleClose() {
     this.thisPartialJoinFn.close();
   }
 
-  protected OperatorSpec<M, RM> getOperatorSpec() {
-    return (OperatorSpec<M, RM>) joinOpSpec;
+  protected OperatorSpec<M, JM> getOperatorSpec() {
+    return (OperatorSpec<M, JM>) joinOpSpec;
   }
 
   /**
@@ -129,7 +96,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
    */
   @Override
   protected String getOperatorName() {
-    String side = isLeftSide ? "L" : "R";
-    return this.joinOpSpec.getOpName() + "-" + side;
+    return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
index e544e2e..56d839e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -52,6 +52,7 @@ public interface TimeSeriesStore<K, V> {
    * @param key the key to look up in the store
    * @param startTimestamp the start timestamp of the range, inclusive
    * @param endTimestamp the end timestamp of the range, exclusive
+   * @return an iterator over the values for the given key in the provided time-range that must be closed after use
    * @throws IllegalArgumentException when startTimeStamp &gt; endTimestamp, or when either of them is negative
    */
   ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp);

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
index ad5e844..5e45148 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
@@ -26,18 +26,18 @@ package org.apache.samza.operators.impl.store;
  */
 public class TimestampedValue<V> {
   private final V value;
-  private final Long timestamp;
+  private final long timestamp;
 
-  public TimestampedValue(V v, Long time) {
-    value = v;
-    timestamp = time;
+  public TimestampedValue(V v, long timestamp) {
+    this.value = v;
+    this.timestamp = timestamp;
   }
 
   public V getValue() {
     return value;
   }
 
-  public Long getTimestamp() {
+  public long getTimestamp() {
     return timestamp;
   }
 
@@ -48,14 +48,14 @@ public class TimestampedValue<V> {
 
     TimestampedValue<?> that = (TimestampedValue<?>) o;
 
-    if (value != null ? !value.equals(that.value) : that.value != null) return false;
-    return timestamp.equals(that.timestamp);
+    if (timestamp != that.timestamp) return false;
+    return value != null ? value.equals(that.value) : (that.value == null);
   }
 
   @Override
   public int hashCode() {
     int result = value != null ? value.hashCode() : 0;
-    result = 31 * result + timestamp.hashCode();
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
new file mode 100644
index 0000000..b14f8a4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.Serde;
+
+import java.nio.ByteBuffer;
+
+
+public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> {
+  private static final int TIMESTAMP_BYTES = 8;
+  private final Serde<V> vSerde;
+
+  public TimestampedValueSerde(Serde<V> vSerde) {
+    this.vSerde = vSerde;
+  }
+
+  @Override
+  public TimestampedValue<V> fromBytes(byte[] bytes) {
+    ByteBuffer bb = ByteBuffer.wrap(bytes);
+    byte[] vBytes = new byte[bytes.length - TIMESTAMP_BYTES];
+    bb.get(vBytes, 0, vBytes.length);
+    V v = vSerde.fromBytes(vBytes);
+    long ts = bb.getLong();
+    return new TimestampedValue<>(v, ts);
+  }
+
+  @Override
+  public byte[] toBytes(TimestampedValue<V> tv) {
+    byte[] vBytes = vSerde.toBytes(tv.getValue());
+    int vBytesLength = vBytes != null ? vBytes.length : 0;
+    ByteBuffer bb = ByteBuffer.allocate(vBytesLength + TIMESTAMP_BYTES);
+    if (vBytes != null) {
+      bb.put(vBytes);
+    }
+    bb.putLong(tv.getTimestamp());
+    return bb.array();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index f4fe0fd..3f99280 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -18,8 +18,16 @@
  */
 package org.apache.samza.operators.spec;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
+import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 
 
 /**
@@ -28,14 +36,17 @@ import org.apache.samza.operators.functions.WatermarkFunction;
  *
  * @param <K>  the type of join key
  * @param <M>  the type of message in this stream
- * @param <JM>  the type of message in the other stream
- * @param <RM>  the type of join result
+ * @param <OM>  the type of message in the other stream
+ * @param <JM>  the type of join result
  */
-public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { // Object == M | JM
+public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> implements StatefulOperatorSpec { // Object == M | OM
 
   private final OperatorSpec<?, M> leftInputOpSpec;
-  private final OperatorSpec<?, JM> rightInputOpSpec;
-  private final JoinFunction<K, M, JM, RM> joinFn;
+  private final OperatorSpec<?, OM> rightInputOpSpec;
+  private final JoinFunction<K, M, OM, JM> joinFn;
+  private final Serde<K> keySerde;
+  private final Serde<TimestampedValue<M>> messageSerde;
+  private final Serde<TimestampedValue<OM>> otherMessageSerde;
   private final long ttlMs;
 
   /**
@@ -47,15 +58,45 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
    * @param ttlMs  the ttl in ms for retaining messages in each stream
    * @param opId  the unique ID for this operator
    */
-  JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
-      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+  JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec,
+      JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+      long ttlMs, int opId) {
     super(OpCode.JOIN, opId);
     this.leftInputOpSpec = leftInputOpSpec;
     this.rightInputOpSpec = rightInputOpSpec;
     this.joinFn = joinFn;
+    this.keySerde = keySerde;
+    this.messageSerde = new TimestampedValueSerde<>(messageSerde);
+    this.otherMessageSerde = new TimestampedValueSerde<>(otherMessageSerde);
     this.ttlMs = ttlMs;
   }
 
+  @Override
+  public Collection<StoreDescriptor> getStoreDescriptors() {
+    String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
+    String leftStoreName = getLeftOpName();
+    String rightStoreName = getRightOpName();
+    Map<String, String> leftStoreCustomProps = ImmutableMap.of(
+        String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs),
+        String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete",
+        String.format("stores.%s.changelog.kafka.retention.ms", leftStoreName), Long.toString(ttlMs));
+    Map<String, String> rightStoreCustomProps = ImmutableMap.of(
+        String.format("stores.%s.rocksdb.ttl.ms", rightStoreName), Long.toString(ttlMs),
+        String.format("stores.%s.changelog.kafka.cleanup.policy", rightStoreName), "delete",
+        String.format("stores.%s.changelog.kafka.retention.ms", rightStoreName), Long.toString(ttlMs));
+
+    return Arrays.asList(
+        new StoreDescriptor(leftStoreName, rocksDBStoreFactory, this.keySerde, this.messageSerde,
+            leftStoreName, leftStoreCustomProps),
+        new StoreDescriptor(rightStoreName, rocksDBStoreFactory, this.keySerde, this.otherMessageSerde,
+            rightStoreName, rightStoreCustomProps));
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+  }
+
   public OperatorSpec getLeftInputOpSpec() {
     return leftInputOpSpec;
   }
@@ -64,16 +105,19 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
     return rightInputOpSpec;
   }
 
-  public JoinFunction<K, M, JM, RM> getJoinFn() {
+  public String getLeftOpName() {
+    return this.getOpName() + "-L";
+  }
+
+  public String getRightOpName() {
+    return this.getOpName() + "-R";
+  }
+
+  public JoinFunction<K, M, OM, JM> getJoinFn() {
     return this.joinFn;
   }
 
   public long getTtlMs() {
     return ttlMs;
   }
-
-  @Override
-  public WatermarkFunction getWatermarkFn() {
-    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index e67179e..8b2b177 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
@@ -189,18 +190,22 @@ public class OperatorSpecs {
    * @param leftInputOpSpec  the operator spec for the stream on the left side of the join
    * @param rightInputOpSpec  the operator spec for the stream on the right side of the join
    * @param joinFn  the user-defined join function to get join keys and results
+   * @param keySerde  the serde for the join key
+   * @param messageSerde  the serde for messages in the stream on the lefta side of the join
+   * @param otherMessageSerde  the serde for messages in the stream on the right side of the join
    * @param ttlMs  the ttl in ms for retaining messages in each stream
    * @param opId  the unique ID of the operator
    * @param <K>  the type of join key
    * @param <M>  the type of input message
-   * @param <JM>  the type of message in the other join stream
-   * @param <RM>  the type of join result
+   * @param <OM>  the type of message in the other stream
+   * @param <JM>  the type of join result
    * @return  the {@link JoinOperatorSpec}
    */
-  public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> createJoinOperatorSpec(
-      OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
-      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
-    return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, ttlMs, opId);
+  public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec(
+      OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn,
+      Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) {
+    return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn,
+        keySerde, messageSerde, otherMessageSerde, ttlMs, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
new file mode 100644
index 0000000..90dfe59
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import java.util.Collection;
+
+
+/**
+ * Spec for stateful operators.
+ */
+public interface StatefulOperatorSpec {
+
+  /**
+   * Get the store descriptors for stores required by this operator.
+   *
+   * @return store descriptors for this operator's stores
+   */
+  Collection<StoreDescriptor> getStoreDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
new file mode 100644
index 0000000..8aa2dd9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.Serde;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A descriptor for a store.
+ */
+public class StoreDescriptor {
+  private final String storeName;
+  private final String storeFactory;
+  private final Serde keySerde;
+  private final Serde msgSerde;
+  private final String changelogStream;
+  private final Map<String, String> otherProperties;
+
+  StoreDescriptor(String storeName, String storeFactory, Serde keySerde, Serde msgSerde,
+      String changelogStream, Map<String, String> otherProperties) {
+    this.storeName = storeName;
+    this.storeFactory = storeFactory;
+    this.keySerde = keySerde;
+    this.msgSerde = msgSerde;
+    this.changelogStream = changelogStream;
+    this.otherProperties = otherProperties;
+  }
+
+  public String getStoreName() {
+    return storeName;
+  }
+
+  public Serde getKeySerde() {
+    return keySerde;
+  }
+
+  public Serde getMsgSerde() {
+    return msgSerde;
+  }
+
+  public JavaStorageConfig getStorageConfigs() {
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), this.getStoreFactory());
+    configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), this.getStoreName()), this.getChangelogStream());
+    configs.putAll(this.getOtherProperties());
+    return new JavaStorageConfig(new MapConfig(configs));
+  }
+
+  private String getStoreFactory() {
+    return storeFactory;
+  }
+
+  private String getChangelogStream() {
+    return changelogStream;
+  }
+
+  private Map<String, String> getOtherProperties() {
+    return otherProperties;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1caca26..ff0299d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -154,7 +154,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       writePlanJsonFile(executionPlanJson);
 
       // 2. create the necessary streams
-      // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391
+      // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
       String planId = String.valueOf(executionPlanJson.hashCode());
       createStreams(planId, plan.getIntermediateStreams());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 4c4a645..9d44ec1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -216,13 +216,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   public void onNewJobModelAvailable(final String version) {
     debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
-        LOG.info("pid=" + processorId + "new JobModel available");
+        LOG.info("pid=" + processorId + ": new JobModel available");
         // get the new job model from ZK
         newJobModel = zkUtils.getJobModel(version);
         LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
         if (!newJobModel.getContainers().containsKey(processorId)) {
-          LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId);
+          LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
+              processorId, newJobModel);
           stop();
         } else {
           // stop current work

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c3e1be2..6071c1f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -805,7 +805,7 @@ class SamzaContainer(
    */
   def shutdown(): Unit = {
     if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) {
-      throw new IllegalContainerStateException("Cannot shutdown a container with status - " + status)
+      throw new IllegalContainerStateException("Cannot shutdown a container with status " + status)
     }
     shutdownRunLoop()
   }
@@ -936,16 +936,18 @@ class SamzaContainer(
     val runLoopThread = Thread.currentThread()
     shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
       override def run() = {
-        info("Shutting down, will wait up to %s ms" format shutdownMs)
+        info("Shutting down, will wait up to %s ms." format shutdownMs)
         shutdownRunLoop()  //TODO: Pull out shutdown hook to LocalContainerRunner or SP
         try {
           runLoopThread.join(shutdownMs)
         } catch {
           case e: Throwable => // Ignore to avoid deadlock with uncaughtExceptionHandler. See SAMZA-1220
-            error("Did not shut down within %s ms, exiting" format shutdownMs, e)
+            error("Did not shut down within %s ms, exiting." format shutdownMs, e)
         }
         if (!runLoopThread.isAlive) {
           info("Shutdown complete")
+        } else {
+          error("Did not shut down within %s ms, exiting." format shutdownMs)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 95939c4..df393b0 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -49,7 +49,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
     orders
-        .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+        .join(shipments, new MyJoinFunction(),
+            new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
+            Duration.ofMinutes(1))
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
         .sendTo(fulfilledOrders);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 50b0a13..f6441dc 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -130,14 +131,14 @@ public class TestExecutionPlanner {
      */
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -145,8 +146,14 @@ public class TestExecutionPlanner {
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+        .sendTo(output1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+        .sendTo(output2);
 
     return streamGraph;
   }
@@ -154,14 +161,14 @@ public class TestExecutionPlanner {
   private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -169,17 +176,26 @@ public class TestExecutionPlanner {
     OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
-    m1.map(m -> m)
+    messageStream1.map(m -> m)
         .filter(m->true)
         .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
 
-    m2.map(m -> m)
+    messageStream2.map(m -> m)
         .filter(m->true)
         .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
 
-    m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(252)).sendTo(output2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600))
+        .sendTo(output1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100))
+        .sendTo(output2);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252))
+        .sendTo(output2);
 
     return streamGraph;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 095e407..10c4aa3 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -30,6 +30,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -107,14 +108,14 @@ public class TestJobGraphJsonGenerator {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
-    MessageStream<KV<Object, Object>> m1 =
+    MessageStream<KV<Object, Object>> messageStream1 =
         streamGraph.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
-    MessageStream<KV<Object, Object>> m2 =
+    MessageStream<KV<Object, Object>> messageStream2 =
         streamGraph.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value)
             .filter(m -> true);
-    MessageStream<KV<Object, Object>> m3 =
+    MessageStream<KV<Object, Object>> messageStream3 =
         streamGraph.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value)
@@ -122,9 +123,15 @@ public class TestJobGraphJsonGenerator {
     OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
     OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
 
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
-    m2.sink((message, collector, coordinator) -> { });
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+        .sendTo(outputStream1);
+    messageStream2.sink((message, collector, coordinator) -> { });
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+        .sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     ExecutionPlan plan = planner.plan(streamGraph);

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 918da26..f6ebaf9 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -26,6 +26,8 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -35,6 +37,7 @@ import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,25 +53,35 @@ public class TestJobNode {
   @Test
   public void testAddSerdeConfigs() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+    StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
+    StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
     StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
-    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
-    doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system");
+    doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
+    doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
     doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
-    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2");
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
     streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
-    MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+    MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1");
+    MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2");
     OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
-    input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+    JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
+    input1
+        .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value)
+        .join(input2.map(kv -> kv.value), mockJoinFn,
+            new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1))
+        .sendTo(output);
 
     JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
     Config config = new MapConfig();
-    StreamEdge inputEdge = new StreamEdge(inputSpec, config);
+    StreamEdge input1Edge = new StreamEdge(input1Spec, config);
+    StreamEdge input2Edge = new StreamEdge(input2Spec, config);
     StreamEdge outputEdge = new StreamEdge(outputSpec, config);
     StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
-    jobNode.addInEdge(inputEdge);
+    jobNode.addInEdge(input1Edge);
+    jobNode.addInEdge(input2Edge);
     jobNode.addOutEdge(outputEdge);
     jobNode.addInEdge(repartitionEdge);
     jobNode.addOutEdge(repartitionEdge);
@@ -85,28 +98,74 @@ public class TestJobNode {
         e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
         e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
     ));
-    assertEquals(2, serializers.size());
+    assertEquals(5, serializers.size()); // 2 default + 3 specific for join
 
-    String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
-    String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(inputKeySerde));
-    assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
-    assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+    String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde");
+    String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde");
+    assertTrue("Serialized serdes should contain input1 key serde",
+        deserializedSerdes.containsKey(input1KeySerde));
+    assertTrue("Serialized input1 key serde should be a StringSerde",
+        input1KeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain input1 msg serde",
+        deserializedSerdes.containsKey(input1MsgSerde));
+    assertTrue("Serialized input1 msg serde should be a JsonSerdeV2",
+        input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde");
+    String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde");
+    assertTrue("Serialized serdes should contain input2 key serde",
+        deserializedSerdes.containsKey(input2KeySerde));
+    assertTrue("Serialized input2 key serde should be a StringSerde",
+        input2KeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain input2 msg serde",
+        deserializedSerdes.containsKey(input2MsgSerde));
+    assertTrue("Serialized input2 msg serde should be a JsonSerdeV2",
+        input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
 
     String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
     String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(outputKeySerde));
-    assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
-    assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
-    String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
-    String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
-    assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
-    assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
-    assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
-    assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain output key serde",
+        deserializedSerdes.containsKey(outputKeySerde));
+    assertTrue("Serialized output key serde should be a StringSerde",
+        outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain output msg serde",
+        deserializedSerdes.containsKey(outputMsgSerde));
+    assertTrue("Serialized output msg serde should be a StringSerde",
+        outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde");
+    String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde");
+    assertTrue("Serialized serdes should contain intermediate stream key serde",
+        deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue("Serialized intermediate stream key serde should be a StringSerde",
+        partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain intermediate stream msg serde",
+        deserializedSerdes.containsKey(partitionByMsgSerde));
+    assertTrue(
+        "Serialized intermediate stream msg serde should be a StringSerde",
+        partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde");
+    String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde");
+    assertTrue("Serialized serdes should contain left join store key serde",
+        deserializedSerdes.containsKey(leftJoinStoreKeySerde));
+    assertTrue("Serialized left join store key serde should be a StringSerde",
+        leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain left join store msg serde",
+        deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
+    assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde",
+        leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
+
+    String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde");
+    String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde");
+    assertTrue("Serialized serdes should contain right join store key serde",
+        deserializedSerdes.containsKey(rightJoinStoreKeySerde));
+    assertTrue("Serialized right join store key serde should be a StringSerde",
+        rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue("Serialized serdes should contain right join store msg serde",
+        deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
+    assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde",
+        rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 0df6721..1120c25 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,10 +22,13 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -34,7 +37,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
@@ -49,6 +51,7 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -261,6 +264,9 @@ public class TestJoinOperator {
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
             new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    // need to return different stores for left and right side
+    when(taskContext.getStore(eq("join-4-L"))).thenReturn(new InternalInMemoryStore<>());
+    when(taskContext.getStore(eq("join-4-R"))).thenReturn(new InternalInMemoryStore<>());
 
     Config config = mock(Config.class);
 
@@ -287,7 +293,9 @@ public class TestJoinOperator {
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
-          .join(inStream2, joinFn, JOIN_TTL)
+          .join(inStream2, joinFn,
+              new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new JsonSerdeV2<>(SecondStreamIME.class),
+              JOIN_TTL)
           .sink((message, messageCollector, taskCoordinator) -> {
               messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
             });

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index c6554bc..f23bb14 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -41,6 +41,7 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -288,7 +289,7 @@ public class TestMessageStreamImpl {
         mock(JoinFunction.class);
 
     Duration joinTtl = Duration.ofMinutes(1);
-    source1.join(source2, mockJoinFn, joinTtl);
+    source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl);
 
     ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
     verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index d73c545..a759e52 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -44,6 +44,7 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
@@ -218,10 +219,13 @@ public class TestOperatorImplGraph {
     JoinFunction mockJoinFunction = mock(JoinFunction.class);
     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
-    inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+    inputStream1.join(inputStream2, mockJoinFunction,
+        mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1));
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new InternalInMemoryStore<>());
+    when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new InternalInMemoryStore<>());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
 
@@ -386,15 +390,22 @@ public class TestOperatorImplGraph {
         .thenReturn(int2);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    Serde inputSerde = new NoOpSerde<>();
-    MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", inputSerde).filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
-    OutputStream<Object> om1 = streamGraph.getOutputStream("output1");
-    OutputStream<Object> om2 = streamGraph.getOutputStream("output2");
-
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha", m -> m).sendTo(om1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+    MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m -> m);
+    MessageStream messageStream2 = streamGraph.getInputStream("input2").filter(m -> true);
+    MessageStream messageStream3 =
+        streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
+    OutputStream<Object> outputStream1 = streamGraph.getOutputStream("output1");
+    OutputStream<Object> outputStream2 = streamGraph.getOutputStream("output2");
+
+    messageStream1
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+        .partitionBy(m -> "haha", m -> m)
+        .sendTo(outputStream1);
+    messageStream3
+        .join(messageStream2, mock(JoinFunction.class),
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+        .sendTo(outputStream2);
 
     Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
     Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());