You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/09/28 00:08:08 UTC

[1/4] samza git commit: SAMZA-1109; Allow setting Serdes for fluent API operators in code

Repository: samza
Updated Branches:
  refs/heads/master 987180a17 -> f16ba2692


http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
index 40e5e30..2a8f039 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
@@ -18,32 +18,33 @@
  */
 package org.apache.samza.test.operator;
 
+
 class PageView {
-  private final String userId;
-  private final String country;
-  private final String url;
-
-  /**
-   * Constructs a {@link PageView} from the provided string.
-   *
-   * @param message in the following CSV format - userId,country,url
-   */
-  PageView(String message) {
-    String[] pageViewFields = message.split(",");
-    userId = pageViewFields[0];
-    country = pageViewFields[1];
-    url = pageViewFields[2];
-  }
+  private String userId;
+  private String country;
+  private String url;
 
-  String getUserId() {
+  public String getUserId() {
     return userId;
   }
 
-  String getCountry() {
+  public String getCountry() {
     return country;
   }
 
-  String getUrl() {
+  public String getUrl() {
     return url;
   }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+
+  public void setCountry(String country) {
+    this.country = country;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
index 1e2acb2..261b954 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -21,37 +21,35 @@ package org.apache.samza.test.operator;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
 
 /**
- * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count.
+ * A {@link StreamApplication} that demonstrates a partitionBy followed by a windowed count.
  */
 public class RepartitionWindowApp implements StreamApplication {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class);
+  static final String INPUT_TOPIC = "page-views";
+  static final String OUTPUT_TOPIC = "page-view-counts";
 
   @Override
   public void init(StreamGraph graph, Config config) {
+    MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
 
-    MessageStream<String> pageViews = graph.<String, String, String>getInputStream("page-views", (k, v) -> v);
-    Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
-
-    OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
-        .getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+    OutputStream<KV<String, String>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
 
     pageViews
-        .partitionBy(keyFn)
-        .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+        .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
+        .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3)))
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
         .sendTo(outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 65b48d3..4c83960 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -21,35 +21,36 @@ package org.apache.samza.test.operator;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 
 import java.time.Duration;
-import java.util.Collection;
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a session window.
  */
 public class SessionWindowApp implements StreamApplication {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class);
+  private static final String INPUT_TOPIC = "page-views";
+  private static final String OUTPUT_TOPIC = "page-view-counts";
   private static final String FILTER_KEY = "badKey";
-  private static final String OUTPUT_TOPIC = "Result";
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.<String, String, PageView>getInputStream("page-views", (k, v) -> new PageView(v));
-    OutputStream<String, String, WindowPane<String, Collection<PageView>>> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+    MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
+    OutputStream<KV<String, Integer>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedSessionWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3)))
+        .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3)))
+        .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 57522eb..3745541 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -25,13 +25,13 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC;
+import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC;
+
 /**
  * Test driver for {@link RepartitionWindowApp}.
  */
 public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
-
-  static final String INPUT_TOPIC = "page-views";
-  static final String OUTPUT_TOPIC = "Result";
   private static final String APP_NAME = "RepartitionedSessionizer";
 
   @Test
@@ -41,11 +41,11 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
     createTopic(OUTPUT_TOPIC, 1);
 
     // produce messages to different partitions.
-    produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,5.com");
-    produceMessage(INPUT_TOPIC, 1, "userId2", "userId2,china,4.com");
-    produceMessage(INPUT_TOPIC, 2, "userId1", "userId1,india,1.com");
-    produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,2.com");
-    produceMessage(INPUT_TOPIC, 1, "userId1", "userId1,india,3.com");
+    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"5.com\"}");
+    produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", \"country\":\"china\",\"url\":\"4.com\"}");
+    produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"1.com\"}");
+    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"2.com\"}");
+    produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"3.com\"}");
 
     // run the application
     RepartitionWindowApp app = new RepartitionWindowApp();
@@ -61,9 +61,9 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
       // Assert that there are 4 messages for userId1 and 1 message for userId2.
       Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
       if ("userId1".equals(key)) {
-        Assert.assertEquals(value, "4");
+        Assert.assertEquals("4", value);
       } else {
-        Assert.assertEquals(value, "1");
+        Assert.assertEquals("1", value);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index ae2608d..3f3e615 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -21,35 +21,37 @@ package org.apache.samza.test.operator;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 
 import java.time.Duration;
-import java.util.Collection;
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a tumbling window.
  */
 public class TumblingWindowApp implements StreamApplication {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class);
+  private static final String INPUT_TOPIC = "page-views";
+  private static final String OUTPUT_TOPIC = "page-view-counts";
   private static final String FILTER_KEY = "badKey";
-  private static final String OUTPUT_TOPIC = "Result";
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.<String, String, PageView>getInputStream("page-views", (k, v) -> new PageView(v));
-    OutputStream<String, String, WindowPane<String, Collection<PageView>>> outputStream = graph
-        .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+    MessageStream<PageView> pageViews =
+        graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
+    OutputStream<KV<String, Integer>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
-        .window(Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3)))
+        .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3)))
+        .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index fb8f17a..c550a3b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaServer;
 import kafka.utils.TestUtils;
@@ -47,6 +46,8 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
@@ -62,6 +63,7 @@ import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -523,21 +525,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<String> inputStream = graph.getInputStream(inputTopic,  (key, msg) -> {
-          TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
-          if (streamApplicationCallback != null) {
-            streamApplicationCallback.onMessageReceived(incomingMessage);
-          }
-          if (processedMessagesLatch != null) {
-            processedMessagesLatch.countDown();
-          }
-          if (kafkaEventsConsumedLatch != null) {
-            kafkaEventsConsumedLatch.countDown();
-          }
-          return incomingMessage.toString();
-        });
-      OutputStream<String, String, String> outputStream = graph.getOutputStream(outputTopic, event -> null, event -> event);
-      inputStream.sendTo(outputStream);
+      MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
+      OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
+      inputStream
+          .map(msg -> {
+              TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
+              if (streamApplicationCallback != null) {
+                streamApplicationCallback.onMessageReceived(incomingMessage);
+              }
+              if (processedMessagesLatch != null) {
+                processedMessagesLatch.countDown();
+              }
+              if (kafkaEventsConsumedLatch != null) {
+                kafkaEventsConsumedLatch.countDown();
+              }
+              return incomingMessage.toString();
+            })
+          .sendTo(outputStream);
     }
   }
 }


[4/4] samza git commit: SAMZA-1109; Allow setting Serdes for fluent API operators in code

Posted by ja...@apache.org.
SAMZA-1109; Allow setting Serdes for fluent API operators in code

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Jacob Maes <jm...@apache.org>

Closes #293 from prateekm/serde-instance


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f16ba269
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f16ba269
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f16ba269

Branch: refs/heads/master
Commit: f16ba2692ef85a624ce5f9050ddb6b9bce3dbf6c
Parents: 987180a
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Sep 27 17:08:04 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Wed Sep 27 17:08:04 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  13 +-
 .../versioned/jobs/configuration-table.html     |   9 +-
 .../java/org/apache/samza/operators/KV.java     |  48 +++
 .../apache/samza/operators/MessageStream.java   |  34 +-
 .../apache/samza/operators/OutputStream.java    |   6 +-
 .../org/apache/samza/operators/StreamGraph.java |  67 ++-
 .../org/apache/samza/serializers/Serde.java     |   6 +-
 .../ByteBufferSerde.scala                       |  48 +++
 .../ByteSerde.scala                             |  36 ++
 .../DoubleSerde.scala                           |  45 ++
 .../IntegerSerde.scala                          |  45 ++
 .../JsonSerdeV2.scala                           |  91 +++++
 .../org.apache.samza.serializers/KVSerde.scala  |  55 +++
 .../LongSerde.scala                             |  45 ++
 .../NoOpSerde.scala                             |  37 ++
 .../SerializableSerde.scala                     |  67 +++
 .../StringSerde.scala                           |  49 +++
 .../UUIDSerde.scala                             |  47 +++
 .../TestByteBufferSerde.scala                   |  53 +++
 .../TestByteSerde.scala                         |  38 ++
 .../TestDoubleSerde.scala                       |  40 ++
 .../TestIntegerSerde.scala                      |  37 ++
 .../TestJsonSerdeV2.scala                       |  45 ++
 .../TestLongSerde.scala                         |  40 ++
 .../TestSerializableSerde.scala                 |  45 ++
 .../TestStringSerde.scala                       |  37 ++
 .../TestUUIDSerde.scala                         |  53 +++
 .../org/apache/samza/execution/JobNode.java     |  90 +++-
 .../samza/operators/MessageStreamImpl.java      |  25 +-
 .../apache/samza/operators/StreamGraphImpl.java | 127 +++---
 .../samza/operators/impl/InputOperatorImpl.java |  16 +-
 .../samza/operators/impl/OperatorImpl.java      |  18 +-
 .../samza/operators/impl/OperatorImplGraph.java |  19 +-
 .../operators/impl/OutputOperatorImpl.java      |  22 +-
 .../operators/impl/PartitionByOperatorImpl.java |  82 ++++
 .../operators/impl/WindowOperatorImpl.java      |   3 +-
 .../samza/operators/spec/InputOperatorSpec.java |  35 +-
 .../samza/operators/spec/OperatorSpec.java      |  20 +-
 .../samza/operators/spec/OperatorSpecs.java     |  26 +-
 .../operators/spec/OutputOperatorSpec.java      |  11 +-
 .../samza/operators/spec/OutputStreamImpl.java  |  28 +-
 .../operators/spec/PartitionByOperatorSpec.java |  76 ++++
 .../stream/IntermediateMessageStreamImpl.java   |  17 +-
 .../serializers/IntermediateMessageSerde.java   |  39 +-
 .../apache/samza/task/StreamOperatorTask.java   |   4 +-
 .../apache/samza/config/SerializerConfig.scala  |   6 +-
 .../apache/samza/container/SamzaContainer.scala |  55 ++-
 .../samza/serializers/ByteBufferSerde.scala     |  48 ---
 .../apache/samza/serializers/ByteSerde.scala    |  36 --
 .../apache/samza/serializers/DoubleSerde.scala  |  45 --
 .../apache/samza/serializers/IntegerSerde.scala |  45 --
 .../apache/samza/serializers/JsonSerde.scala    |  32 +-
 .../apache/samza/serializers/LongSerde.scala    |  45 --
 .../serializers/MetricsSnapshotSerde.scala      |   4 +-
 .../samza/serializers/SerializableSerde.scala   |  67 ---
 .../apache/samza/serializers/StringSerde.scala  |  44 --
 .../apache/samza/serializers/UUIDSerde.scala    |  47 ---
 .../apache/samza/example/BroadcastExample.java  |  18 +-
 .../samza/example/KeyValueStoreExample.java     |  26 +-
 .../org/apache/samza/example/MergeExample.java  |  20 +-
 .../samza/example/OrderShipmentJoinExample.java |  28 +-
 .../samza/example/PageViewCounterExample.java   |  13 +-
 .../samza/example/RepartitionExample.java       |  22 +-
 .../org/apache/samza/example/WindowExample.java |   8 +-
 .../samza/execution/TestExecutionPlanner.java   |  59 +--
 .../execution/TestJobGraphJsonGenerator.java    |  35 +-
 .../org/apache/samza/execution/TestJobNode.java | 111 +++++
 .../samza/operators/TestJoinOperator.java       |  21 +-
 .../samza/operators/TestMessageStreamImpl.java  |  69 +++-
 .../samza/operators/TestStreamGraphImpl.java    | 407 +++++++++++++++++--
 .../samza/operators/TestWindowOperator.java     |  17 +-
 .../operators/impl/TestOperatorImplGraph.java   |  66 ++-
 .../operators/impl/TestStreamOperatorImpl.java  |   4 +-
 .../samza/serializers/TestByteBufferSerde.scala |  53 ---
 .../samza/serializers/TestByteSerde.scala       |  38 --
 .../samza/serializers/TestDoubleSerde.scala     |  40 --
 .../samza/serializers/TestIntegerSerde.scala    |  37 --
 .../samza/serializers/TestLongSerde.scala       |  40 --
 .../serializers/TestSerializableSerde.scala     |  45 --
 .../samza/serializers/TestStringSerde.scala     |  37 --
 .../samza/serializers/TestUUIDSerde.scala       |  53 ---
 .../apache/samza/config/Log4jSystemConfig.java  |   2 +-
 .../samza/logging/log4j/StreamAppender.java     |   2 +-
 samza-test/src/main/resources/log4j.xml         |  44 +-
 .../apache/samza/test/operator/PageView.java    |  37 +-
 .../test/operator/RepartitionWindowApp.java     |  28 +-
 .../samza/test/operator/SessionWindowApp.java   |  23 +-
 .../test/operator/TestRepartitionWindowApp.java |  20 +-
 .../samza/test/operator/TumblingWindowApp.java  |  24 +-
 .../processor/TestZkLocalApplicationRunner.java |  36 +-
 90 files changed, 2529 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 319e51f..16091ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -120,11 +120,22 @@ subprojects {
 }
 
 project(':samza-api') {
-  apply plugin: 'java'
+  apply plugin: 'scala'
   apply plugin: 'checkstyle'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
+  sourceSets.main.java.srcDirs = []
+  sourceSets.test.java.srcDirs = []
+
   dependencies {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
+    compile "org.scala-lang:scala-library:$scalaLibVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index d736d6b..b4a1d56 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1138,7 +1138,14 @@
                             <dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.String</code> objects as UTF-8.</dd>
                             <dt><code>org.apache.samza.serializers.JsonSerdeFactory</code></dt>
-                            <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.</dd>
+                            <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
+                                Note: This Serde enforces a dash-separated property naming convention, while JsonSerdeV2 doesn't.
+                                This serde is primarily meant for Samza's internal usage, and is publicly available for backwards compatibility.</dd>
+                            <dt><code>org.apache.samza.serializers.JsonSerdeV2Factory</code></dt>
+                            <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
+                                Note: This Serde uses Jackson's default (camelCase) property naming convention. This serde should be <br/>
+                                preferred over JsonSerde, especially in High Level API, unless the dasherized naming convention is required <br/>
+                                (e.g., for backwards compatibility).</dd>
                             <dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd>
                             <dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/KV.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/KV.java b/samza-api/src/main/java/org/apache/samza/operators/KV.java
new file mode 100644
index 0000000..0bed3b9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/KV.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+/**
+ * A key and value pair.
+ *
+ * @param <K> type of the key
+ * @param <V> type of the value
+ */
+public class KV<K, V> {
+  public final K key;
+  public final V value;
+
+  public static <K, V> KV<K, V> of(K key, V value) {
+    return new KV<>(key, value);
+  }
+
+  public KV(K key, V value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public V getValue() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index bef1d3f..2a1045d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -26,6 +26,7 @@ import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -67,7 +68,7 @@ public interface MessageStream<M> {
    * Applies the provided function to messages in this {@link MessageStream} and returns the
    * filtered {@link MessageStream}.
    * <p>
-   * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
+   * The {@link FilterFunction} is a predicate which determines whether a message in this {@link MessageStream}
    * should be retained in the filtered {@link MessageStream}.
    *
    * @param filterFn the predicate to filter messages from this {@link MessageStream}.
@@ -93,10 +94,8 @@ public interface MessageStream<M> {
    * Allows sending messages in this {@link MessageStream} to an {@link OutputStream}.
    *
    * @param outputStream the output stream to send messages to
-   * @param <K> the type of key in the outgoing message
-   * @param <V> the type of message in the outgoing message
    */
-  <K, V> void sendTo(OutputStream<K, V, M> outputStream);
+  void sendTo(OutputStream<M> outputStream);
 
   /**
    * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
@@ -171,8 +170,9 @@ public interface MessageStream<M> {
    * intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and
    * input to the job.
    * <p>
-   * The key and message Serdes configured for the default system must be able to serialize and deserialize
-   * types K and M respectively.
+   * Uses the provided {@link KVSerde} for serialization of keys and values. If the provided {@code serde} is null,
+   * uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde.
+   * If no default serde has been provided <b>before</b> calling this method, no-op serdes are used for keys and values.
    * <p>
    * The number of partitions for this intermediate stream is determined as follows:
    * If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known,
@@ -182,11 +182,25 @@ public interface MessageStream<M> {
    * Else, the number of partitions is set to to the max of number of partitions for all input and output streams
    * (excluding intermediate streams).
    *
-   * @param keyExtractor the {@link Function} to extract the output message key and partition key from
-   *                     the input message
-   * @param <K> the type of output message key and partition key
+   * @param <K> the type of output key
+   * @param <V> the type of output value
+   * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
+   * @param valueExtractor the {@link Function} to extract the value from the input message
+   * @param serde the {@link KVSerde} to use for (de)serializing the key and value.
    * @return the repartitioned {@link MessageStream}
    */
-  <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor);
+  <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde);
 
+  /**
+   * Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde.
+   *
+   * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
+   * @param valueExtractor the {@link Function} to extract the value from the input message
+   * @param <K> the type of output key
+   * @param <V> the type of output value
+   * @return the repartitioned {@link MessageStream}
+   */
+  <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+      Function<? super M, ? extends V> valueExtractor);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
index 7335d56..1ebb535 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -23,11 +23,9 @@ import org.apache.samza.annotation.InterfaceStability;
 /**
  * An output stream to send messages to.
  *
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
- * @param <M> the type of message in this {@link OutputStream}
+ * @param <M> the type of message being sent to this {@link OutputStream}
  */
 @InterfaceStability.Unstable
-public interface OutputStream<K, V, M> {
+public interface OutputStream<M> {
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index ea6721b..17223b1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,31 +19,56 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.Serde;
 
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 /**
- * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe the processing logic.
+ * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe application logic.
  */
 @InterfaceStability.Unstable
 public interface StreamGraph {
 
   /**
+   * Sets the default {@link Serde} to use for (de)serializing messages.
+   * <p>.
+   * If the default serde is set, it must be set <b>before</b> creating any input or output streams.
+   * If no explicit or default serdes are provided, a NoOpSerde is used for keys and values. This means that any
+   * streams created without explicit or default serdes should be cast to MessageStream&lt;KV&lt;Object, Object&gt;&gt;.
+   * Providing an incompatible message type for the input/output streams that use the default serde will result in
+   * {@link ClassCastException}s at runtime.
+   *
+   * @param serde the default message {@link Serde} to use
+   */
+  void setDefaultSerde(Serde<?> serde);
+
+  /**
    * Gets the input {@link MessageStream} corresponding to the {@code streamId}.
    * <p>
    * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
    *
    * @param streamId the unique ID for the stream
-   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
-   *                   in the input {@link MessageStream}
-   * @param <K> the type of key in the incoming message
-   * @param <V> the type of message in the incoming message
+   * @param serde the {@link Serde} to use for deserializing incoming messages
+   * @param <M> the type of messages in the input {@link MessageStream}
+   * @return the input {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
+   */
+  <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde);
+
+  /**
+   * Same as {@link #getInputStream(String, Serde)}, but uses the default {@link Serde} provided via
+   * {@link #setDefaultSerde(Serde)} for deserializing input messages.
+   * <p>
+   * If no default serde has been provided <b>before</b> calling this method, a no-op serde is used.
+   * Providing a message type {@code M} that is incompatible with the default Serde will result in
+   * {@link ClassCastException}s at runtime.
+   * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
+   *
+   * @param streamId the unique ID for the stream
    * @param <M> the type of message in the input {@link MessageStream}
    * @return the input {@link MessageStream}
    * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
    */
-  <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder);
+  <M> MessageStream<M> getInputStream(String streamId);
 
   /**
    * Gets the {@link OutputStream} corresponding to the {@code streamId}.
@@ -51,16 +76,28 @@ public interface StreamGraph {
    * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
    *
    * @param streamId the unique ID for the stream
-   * @param keyExtractor the {@link Function} to extract the outgoing key from the output message
-   * @param msgExtractor the {@link Function} to extract the outgoing message from the output message
-   * @param <K> the type of key in the outgoing message
-   * @param <V> the type of message in the outgoing message
-   * @param <M> the type of message in the {@link OutputStream}
+   * @param serde the {@link Serde} to use for serializing outgoing messages
+   * @param <M> the type of messages in the {@link OutputStream}
+   * @return the output {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
+   */
+  <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde);
+
+  /**
+   * Same as {@link #getOutputStream(String, Serde)}, but uses the default {@link Serde} provided via
+   * {@link #setDefaultSerde(Serde)} for serializing output messages.
+   * <p>
+   * If no default serde has been provided <b>before</b> calling this method, a no-op serde is used.
+   * Providing a message type {@code M} that is incompatible with the default Serde will result in
+   * {@link ClassCastException}s at runtime.
+   * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
+   *
+   * @param streamId the unique ID for the stream
+   * @param <M> the type of messages in the {@link OutputStream}
    * @return the output {@link MessageStream}
    * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
    */
-  <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
-      Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor);
+  <M> OutputStream<M> getOutputStream(String streamId);
 
   /**
    * Sets the {@link ContextManager} for this {@link StreamGraph}.

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
index a59b8c2..ac33c2d 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
@@ -19,12 +19,16 @@
 
 package org.apache.samza.serializers;
 
+import java.io.Serializable;
+
 /**
  * A Serde is a convenience type that implements both the {@link org.apache.samza.serializers.Serializer} and
  * {@link org.apache.samza.serializers.Deserializer} interfaces, allowing it to both read and write data
  * in its value type, T.
  *
+ * A serde instance itself must be {@link Serializable} using Java serialization.
+ *
  * @param <T> The type of serialized object implementations can both read and write
  */
-public interface Serde<T> extends Serializer<T>, Deserializer<T> {
+public interface Serde<T> extends Serializer<T>, Deserializer<T>, Serializable {
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
new file mode 100644
index 0000000..adb8781
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+import java.nio.ByteBuffer
+
+/**
+ * A serializer for ByteBuffers.
+ */
+class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
+  def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde
+}
+
+class ByteBufferSerde extends Serde[ByteBuffer] {
+  def toBytes(byteBuffer: ByteBuffer) = {
+    if (byteBuffer != null) {
+      val bytes = new Array[Byte](byteBuffer.remaining())
+      byteBuffer.duplicate().get(bytes)
+      bytes
+    } else {
+      null
+    }
+  }
+
+  def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
+    ByteBuffer.wrap(bytes)
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
new file mode 100644
index 0000000..968da26
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for bytes that is effectively a no-op but can be useful for 
+ * binary messages.
+ */
+class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
+  def getSerde(name: String, config: Config): Serde[Array[Byte]] = new ByteSerde
+}
+
+class ByteSerde extends Serde[Array[Byte]] {
+  def toBytes(bytes: Array[Byte]) = bytes
+
+  def fromBytes(bytes: Array[Byte]) = bytes
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
new file mode 100644
index 0000000..7981d2c
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for doubles
+ */
+class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
+  def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde
+}
+
+class DoubleSerde extends Serde[java.lang.Double] {
+  def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
+    ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
+  } else {
+    null
+  }
+
+  // big-endian by default
+  def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
+    ByteBuffer.wrap(bytes).getDouble
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
new file mode 100644
index 0000000..46509f7
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for integers
+ */
+class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
+  def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde
+}
+
+class IntegerSerde extends Serde[java.lang.Integer] {
+  def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
+    ByteBuffer.allocate(4).putInt(obj.intValue).array
+  } else {
+    null
+  }
+
+  // big-endian by default
+  def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
+    ByteBuffer.wrap(bytes).getInt
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
new file mode 100644
index 0000000..446035c
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+import org.slf4j.LoggerFactory
+
+
+/**
+  * A serializer for JSON strings. JsonSerdeV2 differs from JsonSerde in that:
+  * <ol>
+  *   <li>
+  *     It allows specifying the specific POJO type to deserialize to (using JsonSerdeV2(Class[T])
+  *     or JsonSerdeV2#of(Class[T]). JsonSerde always returns a LinkedHashMap<String, Object> upon deserialization.
+  *   <li>
+  *     It uses Jackson's default 'camelCase' property naming convention, which simplifies defining
+  *     the POJO to bind to. JsonSerde enforces the 'dash-separated' property naming convention.
+  * </ol>
+  * This JsonSerdeV2 should be preferred over JsonSerde for High Level API applications, unless
+  * backwards compatibility with the older data format (with dasherized names) is required.
+  *
+  * @param clazzOption the class of the POJO being (de)serialized. If this is None,
+  *                    a LinkedHashMap<String, Object> is returned upon deserialization.
+  * @tparam T the type of the POJO being (de)serialized.
+  */
+class JsonSerdeV2[T] private(clazzOption: Option[Class[T]]) extends Serde[T] {
+  private val LOG = LoggerFactory.getLogger(classOf[JsonSerdeV2[T]])
+  @transient lazy private val mapper = new ObjectMapper()
+
+  def this() {
+    this(None)
+  }
+
+  def this(clazz: Class[T]) {
+    this(Option(clazz))
+  }
+
+  def toBytes(obj: T): Array[Byte] = {
+    try {
+      val str = mapper.writeValueAsString(obj)
+      str.getBytes("UTF-8")
+    } catch {
+      case e: Exception => throw new SamzaException(e);
+    }
+  }
+
+  def fromBytes(bytes: Array[Byte]): T = {
+    val str = new String(bytes, "UTF-8")
+     try {
+       clazzOption match {
+         case Some(clazz) => mapper.readValue(str, clazz)
+         case None => mapper.readValue(str, new TypeReference[T]() {})
+       }
+     } catch {
+       case e: Exception =>
+         LOG.debug(s"Error deserializing message: $str", e)
+         throw new SamzaException(e)
+     }
+  }
+
+}
+
+object JsonSerdeV2 {
+  def of[T](clazz: Class[T]): JsonSerdeV2[T] = {
+    new JsonSerdeV2[T](clazz)
+  }
+}
+
+class JsonSerdeV2Factory extends SerdeFactory[Object] {
+  def getSerde(name: String, config: Config) = new JsonSerdeV2
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
new file mode 100644
index 0000000..5b0a6e3
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.operators.KV
+
+object KVSerde {
+  def of[K, V](keySerde: Serde[K], valueSerde: Serde[V]) = new KVSerde[K, V](keySerde, valueSerde)
+}
+
+/**
+  * A marker serde class to indicate that messages are keyed and should be deserialized as K-V pairs.
+  * This class is intended for use cases where a single Serde parameter or configuration is required.
+  *
+  * @tparam K type of the key in the message
+  * @tparam V type of the value in the message
+  */
+class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends Serde[KV[K, V]] {
+  /**
+    * Implementation Note: This serde must not be used by the framework for serialization/deserialization directly.
+    * Wire up and use the constituent keySerde and valueSerde instead.
+    */
+
+  override def fromBytes(bytes: Array[Byte]): Nothing = {
+    throw new NotImplementedError("This is a marker serde and must not be used directly. " +
+      "Samza must wire up and use the keySerde and valueSerde instead.")
+  }
+
+  override def toBytes(`object`: KV[K, V]): Nothing = {
+    throw new SamzaException("This is a marker serde and must not be used directly. " +
+      "Samza must wire up and use the keySerde and valueSerde instead.")
+  }
+
+  def getKeySerde: Serde[K] = keySerde
+
+  def getValueSerde: Serde[V] = valueSerde
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
new file mode 100644
index 0000000..41ff598
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for longs
+ */
+class LongSerdeFactory extends SerdeFactory[java.lang.Long] {
+  def getSerde(name: String, config: Config): Serde[java.lang.Long] = new LongSerde
+}
+
+class LongSerde extends Serde[java.lang.Long] {
+  def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) {
+    ByteBuffer.allocate(8).putLong(obj.longValue()).array
+  } else {
+    null
+  }
+
+  // big-endian by default
+  def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) {
+    ByteBuffer.wrap(bytes).getLong
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
new file mode 100644
index 0000000..c656526
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers
+
+/**
+  * A marker serde class to indicate that messages should not be serialized or deserialized.
+  * This is the same behavior as when no serde is provided, and is intended for use cases where
+  * a Serde parameter or configuration is required.
+  * This is different than [[ByteSerde]] which is a pass-through serde for byte arrays.
+  *
+  * @tparam T type of messages which should not be serialized or deserialized
+  */
+class NoOpSerde[T] extends Serde[T] {
+
+  override def fromBytes(bytes: Array[Byte]): T =
+    throw new NotImplementedError("NoOpSerde fromBytes should not be invoked by the framework.")
+
+  override def toBytes(obj: T): Array[Byte] =
+    throw new NotImplementedError("NoOpSerde toBytes should not be invoked by the framework.")
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
new file mode 100644
index 0000000..c43f863
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for Serializable
+ */
+class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
+  def getSerde(name: String, config: Config): Serde[T] =
+    new SerializableSerde[T]
+}
+
+class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
+  def toBytes(obj: T): Array[Byte] = if (obj != null) {
+    val bos = new ByteArrayOutputStream
+    val oos = new ObjectOutputStream(bos)
+
+    try {
+      oos.writeObject(obj)
+    }
+    finally {
+      oos.close()
+    }
+
+    bos.toByteArray
+  } else {
+    null
+  }
+
+  def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
+    val bis = new ByteArrayInputStream(bytes)
+    val ois = new ObjectInputStream(bis)
+
+    try {
+      ois.readObject.asInstanceOf[T]
+    }
+    finally{
+      ois.close()
+    }
+  } else {
+    null.asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
new file mode 100644
index 0000000..c69c402
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for strings
+ */
+class StringSerdeFactory extends SerdeFactory[String] {
+  def getSerde(name: String, config: Config): Serde[String] =
+    new StringSerde(config.get("encoding", "UTF-8"))
+}
+
+class StringSerde(val encoding: String) extends Serde[String] {
+  // constructor (for Java) that defaults to UTF-8 encoding
+  def this() {
+    this("UTF-8")
+  }
+
+  def toBytes(obj: String): Array[Byte] = if (obj != null) {
+    obj.toString.getBytes(encoding)
+  } else {
+    null
+  }
+
+  def fromBytes(bytes: Array[Byte]): String = if (bytes != null) {
+    new String(bytes, 0, bytes.size, encoding)
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
new file mode 100644
index 0000000..88d4327
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import java.util.UUID
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for UUID
+ */
+class UUIDSerdeFactory extends SerdeFactory[UUID] {
+  def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde
+}
+
+class UUIDSerde() extends Serde[UUID] {
+  def toBytes(obj: UUID): Array[Byte] = if (obj != null) {
+    ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array
+  } else {
+    null
+  }
+
+  def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) {
+    val buffer = ByteBuffer.wrap(bytes)
+    new UUID(buffer.getLong, buffer.getLong)
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
new file mode 100644
index 0000000..eddfb0a
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+import org.junit.Assert._
+import org.junit.Test
+import java.nio.ByteBuffer
+
+class TestByteBufferSerde {
+  @Test
+  def testSerde {
+    val serde = new ByteBufferSerde
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
+
+    val bytes = "A lazy way of creating a byte array".getBytes()
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.mark()
+    assertArrayEquals(serde.toBytes(byteBuffer), bytes)
+    byteBuffer.reset()
+    assertEquals(serde.fromBytes(bytes), byteBuffer)
+  }
+
+  @Test
+  def testSerializationPreservesInput {
+    val serde = new ByteBufferSerde
+    val bytes = "A lazy way of creating a byte array".getBytes()
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.get() // advance position by 1
+    serde.toBytes(byteBuffer)
+
+    assertEquals(byteBuffer.capacity(), byteBuffer.limit())
+    assertEquals(1, byteBuffer.position())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
new file mode 100644
index 0000000..f605762
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestByteSerde {
+  @Test
+  def testByteSerde {
+    val serde = new ByteSerde
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
+
+    val testBytes = "A lazy way of creating a byte array".getBytes()
+    assertArrayEquals(serde.toBytes(testBytes), testBytes)
+    assertArrayEquals(serde.fromBytes(testBytes), testBytes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
new file mode 100644
index 0000000..60241b5
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestDoubleSerde {
+  @Test
+  def testDoubleSerde {
+    val serde = new DoubleSerde
+    assertEquals(null, serde.toBytes(null))
+    assertEquals(null, serde.fromBytes(null))
+
+    val fooBar = 9.156013e-002
+    val fooBarBytes = serde.toBytes(fooBar)
+    fooBarBytes.foreach(System.err.println)
+    assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
+    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
new file mode 100644
index 0000000..a3e7e1f
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestIntegerSerde {
+  @Test
+  def testIntegerSerde {
+    val serde = new IntegerSerde
+    assertEquals(null, serde.toBytes(null))
+    assertEquals(null, serde.fromBytes(null))
+
+    val fooBar = 37
+    val fooBarBytes = serde.toBytes(fooBar)
+    assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
+    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
new file mode 100644
index 0000000..f5202c8
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+
+class TestJsonSerdeV2 {
+  @Test
+  def testJsonSerdeV2ShouldWork {
+    val serde = new JsonSerdeV2[java.util.HashMap[String, Object]]
+    val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)).asJava)
+    val bytes = serde.toBytes(obj)
+    assertEquals(obj, serde.fromBytes(bytes))
+    val serdeHashMapEntry = new JsonSerdeV2[java.util.Map.Entry[String, Object]]
+    obj.entrySet().asScala.foreach(entry => {
+      try {
+        val entryBytes = serdeHashMapEntry.toBytes(entry)
+      } catch {
+        case e: Exception => fail("HashMap Entry serialization failed!")
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
new file mode 100644
index 0000000..77a7498
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestLongSerde {
+  @Test
+  def testLongSerde {
+    val serde = new LongSerde
+    assertEquals(null, serde.toBytes(null))
+    assertEquals(null, serde.fromBytes(null))
+
+    val fooBar = 1234123412341234L
+    val fooBarBytes = serde.toBytes(fooBar)
+    fooBarBytes.foreach(System.err.println)
+    assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
+    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
new file mode 100644
index 0000000..8e899d0
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSerializableSerde {
+  @Test
+  def testSerializableSerde {
+    val serde = new SerializableSerde[String]
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
+    
+    val obj = "String is serializable"
+
+    // Serialized string is prefix + string itself
+    val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
+    val expected = (prefix ++ obj.getBytes("UTF-8"))
+    
+    val bytes = serde.toBytes(obj)
+
+    assertArrayEquals(expected, bytes)
+
+    val objRoundTrip:String = serde.fromBytes(bytes)
+    assertEquals(obj, objRoundTrip)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
new file mode 100644
index 0000000..a1e8e88
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestStringSerde {
+  @Test
+  def testStringSerde {
+    val serde = new StringSerde("UTF-8")
+    assertEquals(null, serde.toBytes(null))
+    assertEquals(null, serde.fromBytes(null))
+
+    val fooBar = "foo bar"
+    val fooBarBytes = serde.toBytes(fooBar)
+    assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
+    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
new file mode 100644
index 0000000..04ddcdb
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.BufferUnderflowException
+import java.util.UUID
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestUUIDSerde {
+  private val serde = new UUIDSerde
+
+  @Test
+  def testUUIDSerde {
+    val uuid = new UUID(13, 42)
+    val bytes = serde.toBytes(uuid)
+    assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
+    assertEquals(uuid, serde.fromBytes(bytes))
+  }
+
+  @Test
+  def testToBytesWhenNull {
+    assertEquals(null, serde.toBytes(null))
+  }
+
+  @Test
+  def testFromBytesWhenNull {
+    assertEquals(null, serde.fromBytes(null))
+  }
+
+  @Test(expected = classOf[BufferUnderflowException])
+  def testFromBytesWhenInvalid {
+    assertEquals(null, serde.fromBytes(Array[Byte](0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 88b24ba..fea42f2 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -21,27 +21,34 @@ package org.apache.samza.execution;
 
 import com.google.common.base.Joiner;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
  * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
@@ -127,11 +134,88 @@ public class JobNode {
     // write input/output streams to configs
     inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs));
 
+    // write serialized serde instances and stream serde configs to configs
+    addSerdeConfigs(configs);
+
     log.info("Job {} has generated configs {}", jobName, configs);
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
-    // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
-    return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));
+
+    // Disallow user specified job inputs/outputs. This info comes strictly from the pipeline.
+    Map<String, String> allowedConfigs = new HashMap<>(config);
+    if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
+      log.warn("Specifying task inputs in configuration is not allowed with Fluent API. "
+          + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
+      allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
+    }
+
+    log.debug("Job {} has allowed configs {}", jobName, allowedConfigs);
+    return new JobConfig(
+        Util.rewriteConfig(
+            extractScopedConfig(new MapConfig(allowedConfigs), new MapConfig(configs), configPrefix)));
+  }
+
+  /**
+   * Serializes the {@link Serde} instances for operators, adds them to the provided config, and
+   * sets the serde configuration for the input/output/intermediate streams appropriately.
+   *
+   * We try to preserve the number of Serde instances before and after serialization. However we don't
+   * guarantee that references shared between these serdes instances (e.g. an Jackson ObjectMapper shared
+   * between two json serdes) are shared after deserialization too.
+   *
+   * Ideally all the user defined objects in the application should be serialized and de-serialized in one pass
+   * from the same output/input stream so that we can maintain reference sharing relationships.
+   *
+   * @param configs the configs to add serialized serde instances and stream serde configs to
+   */
+  protected void addSerdeConfigs(Map<String, String> configs) {
+    // collect all key and msg serde instances for streams
+    Map<String, Serde> keySerdes = new HashMap<>();
+    Map<String, Serde> msgSerdes = new HashMap<>();
+    Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
+    inEdges.forEach(edge -> {
+        String streamId = edge.getStreamSpec().getId();
+        InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
+        Serde keySerde = inputOperatorSpec.getKeySerde();
+        Serde valueSerde = inputOperatorSpec.getValueSerde();
+        keySerdes.put(streamId, keySerde);
+        msgSerdes.put(streamId, valueSerde);
+      });
+    Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
+    outEdges.forEach(edge -> {
+        String streamId = edge.getStreamSpec().getId();
+        OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
+        Serde keySerde = outputStream.getKeySerde();
+        Serde valueSerde = outputStream.getValueSerde();
+        keySerdes.put(streamId, keySerde);
+        msgSerdes.put(streamId, valueSerde);
+      });
+
+    // for each unique serde instance, generate a unique name and serialize to config
+    HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
+    serdes.addAll(msgSerdes.values());
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Base64.Encoder base64Encoder = Base64.getEncoder();
+    Map<Serde, String> serdeUUIDs = new HashMap<>();
+    serdes.forEach(serde -> {
+        String serdeName = serdeUUIDs.computeIfAbsent(serde,
+            s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
+        configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName),
+            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+      });
+
+    // set key and msg serdes for streams to the serde names generated above
+    keySerdes.forEach((streamId, serde) -> {
+        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
+        String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    msgSerdes.forEach((streamId, serde) -> {
+        String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
+        String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
+        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index db6fd5a..7b93a9e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -29,12 +29,14 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.KVSerde;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -95,9 +97,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   }
 
   @Override
-  public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
+  public void sendTo(OutputStream<M> outputStream) {
     OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
-        (OutputStreamImpl<K, V, M>) outputStream, this.graph.getNextOpId());
+        (OutputStreamImpl<M>) outputStream, this.graph.getNextOpId());
     this.operatorSpec.registerNextOperatorSpec(op);
   }
 
@@ -133,17 +135,24 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   }
 
   @Override
-  public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) {
+  public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+      Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde) {
     int opId = this.graph.getNextOpId();
     String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
-    IntermediateMessageStreamImpl<K, M, M> intermediateStream =
-        this.graph.getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
-    OutputOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
-        intermediateStream.getOutputStream(), opId);
+    IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opName, serde);
+    PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
+        OperatorSpecs.createPartitionByOperatorSpec(
+            intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
     this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
     return intermediateStream;
   }
 
+  @Override
+  public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+      Function<? super M, ? extends V> valueExtractor) {
+    return partitionBy(keyExtractor, valueExtractor, null);
+  }
+
   /**
    * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
    * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 2c2eb56..45378c7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,14 +18,20 @@
  */
 package org.apache.samza.operators;
 
+import com.google.common.base.Preconditions;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -33,8 +39,6 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -42,6 +46,7 @@ import java.util.stream.Collectors;
  * create the DAG of transforms.
  */
 public class StreamGraphImpl implements StreamGraph {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
 
   /**
    * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
@@ -55,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
   private final ApplicationRunner runner;
   private final Config config;
 
+  private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
   private ContextManager contextManager = null;
 
   public StreamGraphImpl(ApplicationRunner runner, Config config) {
@@ -65,46 +71,51 @@ public class StreamGraphImpl implements StreamGraph {
   }
 
   @Override
-  public <K, V, M> MessageStream<M> getInputStream(String streamId,
-      BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
-    if (msgBuilder == null) {
-      throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
-    }
+  public void setDefaultSerde(Serde<?> serde) {
+    Preconditions.checkNotNull(serde, "Default serde must not be null");
+    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+        "Default serde must be set before creating any input or output streams.");
+    this.defaultSerde = serde;
+  }
 
-    if (inputOperators.containsKey(runner.getStreamSpec(streamId))) {
-      throw new IllegalStateException("getInputStream() invoked multiple times "
-          + "with the same streamId: " + streamId);
-    }
+  @Override
+  public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
+    Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
+    Preconditions.checkState(!inputOperators.containsKey(runner.getStreamSpec(streamId)),
+        "getInputStream must not be called multiple times with the same streamId: " + streamId);
 
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    boolean isKeyedInput = serde instanceof KVSerde;
     inputOperators.put(streamSpec,
-        new InputOperatorSpec<>(streamSpec, (BiFunction<K, V, M>) msgBuilder, this.getNextOpId()));
+        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyedInput, this.getNextOpId()));
     return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
   }
 
   @Override
-  public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
-      Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor) {
-    if (keyExtractor == null) {
-      throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
-    }
-
-    if (msgExtractor == null) {
-      throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
-    }
+  public <M> MessageStream<M> getInputStream(String streamId) {
+    return (MessageStream<M>) getInputStream(streamId, defaultSerde);
+  }
 
-    if (outputStreams.containsKey(runner.getStreamSpec(streamId))) {
-      throw new IllegalStateException("getOutputStream() invoked multiple times "
-          + "with the same streamId: " + streamId);
-    }
+  @Override
+  public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
+    Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
+    Preconditions.checkState(!outputStreams.containsKey(runner.getStreamSpec(streamId)),
+        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
 
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
     outputStreams.put(streamSpec,
-        new OutputStreamImpl<>(streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+        new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), serde instanceof KVSerde));
     return outputStreams.get(streamSpec);
   }
 
   @Override
+  public <M> OutputStream<M> getOutputStream(String streamId) {
+    return (OutputStream<M>) getOutputStream(streamId, defaultSerde);
+  }
+
+  @Override
   public StreamGraph withContextManager(ContextManager contextManager) {
     this.contextManager = contextManager;
     return this;
@@ -116,38 +127,31 @@ public class StreamGraphImpl implements StreamGraph {
    *
    * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
    *                   logical streamId.
-   * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
-   * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
-   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
-   *                   in the intermediate {@link MessageStream}
-   * @param <K> the type of key in the intermediate message
-   * @param <V> the type of message in the intermediate message
+   * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+   *              is used.
    * @param <M> the type of messages in the intermediate {@link MessageStream}
    * @return  the intermediate {@link MessageStreamImpl}
    */
-  <K, V, M> IntermediateMessageStreamImpl<K, V, M> getIntermediateStream(String streamName,
-      Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor,
-      BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamName, Serde<M> serde) {
     String streamId = String.format("%s-%s-%s",
         config.get(JobConfig.JOB_NAME()),
         config.get(JobConfig.JOB_ID(), "1"),
         streamName);
-    if (msgBuilder == null) {
-      throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
-    }
-    if (keyExtractor == null) {
-      throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
-    }
-    if (msgExtractor == null) {
-      throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
-    }
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    if (inputOperators.containsKey(streamSpec) || outputStreams.containsKey(streamSpec)) {
-      throw new IllegalStateException("getIntermediateStream() invoked multiple times "
-          + "with the same streamId: " + streamId);
+
+    Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
+        "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+    if (serde == null) {
+      LOGGER.info("Using default serde for intermediate stream: " + streamId);
+      serde = (Serde<M>) defaultSerde;
     }
-    inputOperators.put(streamSpec, new InputOperatorSpec(streamSpec, msgBuilder, this.getNextOpId()));
-    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, keyExtractor, msgExtractor));
+
+    boolean isKeyed = serde instanceof KVSerde;
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    inputOperators.put(streamSpec,
+        new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
   }
 
@@ -205,4 +209,27 @@ public class StreamGraphImpl implements StreamGraph {
 
     return windowOrJoinSpecs.size() != 0;
   }
+
+  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
+    Serde keySerde, valueSerde;
+
+    if (serde instanceof KVSerde) {
+      keySerde = ((KVSerde) serde).getKeySerde();
+      valueSerde = ((KVSerde) serde).getValueSerde();
+    } else {
+      keySerde = new NoOpSerde();
+      valueSerde = serde;
+    }
+
+    if (keySerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+          ". Keys will not be (de)serialized");
+    }
+    if (valueSerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+          ". Values will not be (de)serialized");
+    }
+
+    return KV.of(keySerde, valueSerde);
+  }
 }


[2/4] samza git commit: SAMZA-1109; Allow setting Serdes for fluent API operators in code

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 2c8f682..8c75bca 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -45,8 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -122,11 +121,11 @@ public class TestExecutionPlanner {
      *
      */
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    BiFunction mockBuilder = mock(BiFunction.class);
-    streamGraph.getInputStream("input1", mockBuilder)
-        .partitionBy(m -> "yes!!!").map(m -> m)
+    MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    input1
+        .partitionBy(m -> m.key, m -> m.value)
+        .map(kv -> kv)
         .sendTo(output1);
     return streamGraph;
   }
@@ -145,13 +144,20 @@ public class TestExecutionPlanner {
      */
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
     m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
@@ -162,21 +168,28 @@ public class TestExecutionPlanner {
   private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     m1.map(m -> m)
         .filter(m->true)
-        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
 
     m2.map(m -> m)
         .filter(m->true)
-        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
     m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 4bda86b..095e407 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -19,25 +19,26 @@
 
 package org.apache.samza.execution;
 
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -105,13 +106,21 @@ public class TestJobGraphJsonGenerator {
     StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction mockBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
     m2.sink((message, collector, coordinator) -> { });

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
new file mode 100644
index 0000000..c59c0cc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestJobNode {
+
+  @Test
+  public void testAddSerdeConfigs() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+    StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
+    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
+    doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+    doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+    MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+    OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
+    input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+
+    JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+    StreamEdge inputEdge = new StreamEdge(inputSpec);
+    StreamEdge outputEdge = new StreamEdge(outputSpec);
+    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
+    jobNode.addInEdge(inputEdge);
+    jobNode.addOutEdge(outputEdge);
+    jobNode.addInEdge(repartitionEdge);
+    jobNode.addOutEdge(repartitionEdge);
+
+    Map<String, String> configs = new HashMap<>();
+    jobNode.addSerdeConfigs(configs);
+
+    MapConfig mapConfig = new MapConfig(configs);
+    Config serializers = mapConfig.subset("serializers.registry.", true);
+
+    // make sure that the serializers deserialize correctly
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap(
+        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+        e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+    ));
+    assertEquals(2, serializers.size());
+
+    String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+    String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+    assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+    assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+    String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+    assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+    assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+    String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+    assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..004c5cf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -276,10 +278,11 @@ public class TestJoinOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
       MessageStream<FirstStreamIME> inStream =
-          graph.getInputStream("instream", FirstStreamIME::new);
+          graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new);
       MessageStream<SecondStreamIME> inStream2 =
-          graph.getInputStream("instream2", SecondStreamIME::new);
+          graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new);
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
@@ -330,14 +333,24 @@ public class TestJoinOperator {
   }
 
   private static class FirstStreamIME extends IncomingMessageEnvelope {
+    FirstStreamIME(KV<Integer, Integer> message) {
+      super(new SystemStreamPartition(
+          "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue());
+    }
+
     FirstStreamIME(Integer key, Integer message) {
-      super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, message);
+      this(KV.of(key, message));
     }
   }
 
   private static class SecondStreamIME extends IncomingMessageEnvelope {
+    SecondStreamIME(KV<Integer, Integer> message) {
+      super(new SystemStreamPartition(
+          "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue());
+    }
+
     SecondStreamIME(Integer key, Integer message) {
-      super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, message);
+      this(KV.of(key, message));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 61224f2..c6554bc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
@@ -39,20 +40,19 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.KVSerde;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -172,9 +172,8 @@ public class TestMessageStreamImpl {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-
-    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
-    inputStream.sendTo(mockOutputOpSpec);
+    OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    inputStream.sendTo(mockOutputStreamImpl);
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -182,33 +181,75 @@ public class TestMessageStreamImpl {
 
     assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
     assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
-    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockOutputStreamImpl, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+
+    // same behavior as above so nothing new to assert. but ensures that this variant compiles.
+    MessageStreamImpl<KV<String, TestMessageEnvelope>> keyedInputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    OutputStreamImpl<KV<String, TestMessageEnvelope>> mockKeyedOutputStreamImpl = mock(OutputStreamImpl.class);
+    keyedInputStream.sendTo(mockKeyedOutputStreamImpl);
+
+    // can't unit test it, but the following variants should not compile
+//    inputStream.sendTo(mockKeyedOutputStreamImpl);
+//    keyedInputStream.sendTo(mockOutputStreamImpl);
   }
 
   @Test
-  public void testPartitionBy() {
+  public void testRepartition() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
 
     String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
-    Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
-    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    KVSerde mockKVSerde = mock(KVSerde.class);
     IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
-        .thenReturn(mockOutputOpSpec);
+        .thenReturn(mockOutputStreamImpl);
 
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-    inputStream.partitionBy(mockKeyFn);
+    Function mockKeyFunction = mock(Function.class);
+    Function mockValueFunction = mock(Function.class);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde);
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
     OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
 
-    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
+    assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+    assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
+  }
+
+  @Test
+  public void testRepartitionWithoutSerde() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+    String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(null)))
+        .thenReturn(mockIntermediateStream);
+    when(mockIntermediateStream.getOutputStream())
+        .thenReturn(mockOutputStreamImpl);
+
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    Function mockKeyFunction = mock(Function.class);
+    Function mockValueFunction = mock(Function.class);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
     assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
-    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+    assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 1fc60bd..45583c2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -27,36 +27,136 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestStreamGraphImpl {
 
   @Test
-  public void testGetInputStream() {
+  public void testGetInputStreamWithValueSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+    Serde mockValueSerde = mock(Serde.class);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
     assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetInputStreamWithNullSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    graph.getInputStream("test-stream-1", null);
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    Serde mockValueSerde = mock(Serde.class);
+    graph.setDefaultSerde(mockValueSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultDefaultSerde() {
+    // default default serde == user hasn't provided a default serde
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test
@@ -65,15 +165,13 @@ public class TestStreamGraphImpl {
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
     assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
   }
 
@@ -86,12 +184,12 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
-    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
+    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
 
     assertEquals(graph.getInputOperators().size(), 2);
@@ -105,29 +203,149 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getInputStream("test-stream-1", mock(BiFunction.class));
-    graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+    graph.getInputStream("test-stream-1");
+    // should throw exception
+    graph.getInputStream("test-stream-1");
+  }
+
+  @Test
+  public void testGetOutputStreamWithValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    Serde mockValueSerde = mock(Serde.class);
+    OutputStream<TestMessageEnvelope> outputStream =
+        graph.getOutputStream("test-stream-1", mockValueSerde);
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test
+  public void testGetOutputStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetOutputStreamWithNullSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    graph.getOutputStream("test-stream-1", null);
+  }
+
+  @Test
+  public void testGetOutputStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    Serde mockValueSerde = mock(Serde.class);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.setDefaultSerde(mockValueSerde);
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test
+  public void testGetOutputStreamWithDefaultKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
-  public void testGetOutputStream() {
+  public void testGetOutputStreamWithDefaultDefaultSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
 
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingStreams() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getInputStream("test-stream-1");
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingOutputStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
-    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+    graph.getOutputStream("test-stream-1");
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+  }
 
-    OutputStream<String, String, TestMessageEnvelope> outputStream =
-        graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingIntermediateStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
-    assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
-    assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
-    assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getIntermediateStream("test-stream-1", null);
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
@@ -136,12 +354,89 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
-    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
+    graph.getOutputStream("test-stream-1");
+    graph.getOutputStream("test-stream-1"); // should throw exception
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    Serde mockValueSerde = mock(Serde.class);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockValueSerde);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockKVSerde);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    Serde mockValueSerde = mock(Serde.class);
+    graph.setDefaultSerde(mockValueSerde);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
   }
 
   @Test
-  public void testGetIntermediateStream() {
+  public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
@@ -150,19 +445,45 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
-    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
     assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
-    assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
-    assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -171,8 +492,8 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
-    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
+    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
   }
 
   @Test
@@ -199,9 +520,9 @@ public class TestStreamGraphImpl {
     StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
     when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
 
-    graph.getInputStream("test-stream-1", (k, v) -> v);
-    graph.getInputStream("test-stream-2", (k, v) -> v);
-    graph.getInputStream("test-stream-3", (k, v) -> v);
+    graph.getInputStream("test-stream-1");
+    graph.getInputStream("test-stream-2");
+    graph.getInputStream("test-stream-3");
 
     List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
     Assert.assertEquals(inputSpecs.size(), 3);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..ee44cf9 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -389,8 +391,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
         .map(m -> m)
@@ -418,8 +421,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
@@ -444,8 +448,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
 
       inStream

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..68b4ce0 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,9 +19,9 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -30,6 +30,10 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
@@ -43,8 +47,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -73,9 +75,8 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
-    OutputStream<Object, Object, Object> outputStream =
-        streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+    OutputStream<Object> outputStream = streamGraph.getOutputStream("output");
 
     inputStream
         .filter(mock(FilterFunction.class))
@@ -104,12 +105,49 @@ public class TestOperatorImplGraph {
   }
 
   @Test
+  public void testPartitionByChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
+    when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+        .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+    OutputStream<KV<Integer, String>> outputStream = streamGraph
+        .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+
+    inputStream
+        .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+        .sendTo(outputStream);
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(1, inputOpImpl.registeredOperators.size());
+
+    OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator
+    assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
+
+    InputOperatorImpl repartitionedInputOpImpl =
+        opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+    assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+
+    OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, sendToOpImpl.registeredOperators.size());
+    assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+  }
+
+  @Test
   public void testBroadcastChain() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
@@ -132,7 +170,7 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
     MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
     MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
     MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
@@ -156,8 +194,8 @@ public class TestOperatorImplGraph {
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
     JoinFunction mockJoinFunction = mock(JoinFunction.class);
-    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
     inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
 
     TaskContext mockTaskContext = mock(TaskContext.class);
@@ -182,13 +220,13 @@ public class TestOperatorImplGraph {
     // verify that left partial join operator calls getFirstKey
     Object mockLeftMessage = mock(Object.class);
     when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
-    inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
 
     // verify that right partial join operator calls getSecondKey
     Object mockRightMessage = mock(Object.class);
     when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
-    inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
 
     // verify that the join function apply is called with the correct messages on match
@@ -205,8 +243,8 @@ public class TestOperatorImplGraph {
     when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
-    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1");
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2");
 
     List<String> initializedOperators = new ArrayList<>();
     List<String> closedOperators = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index e183d87..a91c1af 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -41,7 +41,7 @@ public class TestStreamOperatorImpl {
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testSimpleOperator() {
+  public void testStreamOperator() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
@@ -61,7 +61,7 @@ public class TestStreamOperatorImpl {
   }
 
   @Test
-  public void testSimpleOperatorClose() {
+  public void testStreamOperatorClose() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
deleted file mode 100644
index eddfb0a..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-import org.junit.Assert._
-import org.junit.Test
-import java.nio.ByteBuffer
-
-class TestByteBufferSerde {
-  @Test
-  def testSerde {
-    val serde = new ByteBufferSerde
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-
-    val bytes = "A lazy way of creating a byte array".getBytes()
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    byteBuffer.mark()
-    assertArrayEquals(serde.toBytes(byteBuffer), bytes)
-    byteBuffer.reset()
-    assertEquals(serde.fromBytes(bytes), byteBuffer)
-  }
-
-  @Test
-  def testSerializationPreservesInput {
-    val serde = new ByteBufferSerde
-    val bytes = "A lazy way of creating a byte array".getBytes()
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    byteBuffer.get() // advance position by 1
-    serde.toBytes(byteBuffer)
-
-    assertEquals(byteBuffer.capacity(), byteBuffer.limit())
-    assertEquals(1, byteBuffer.position())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
deleted file mode 100644
index f605762..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestByteSerde {
-  @Test
-  def testByteSerde {
-    val serde = new ByteSerde
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-
-    val testBytes = "A lazy way of creating a byte array".getBytes()
-    assertArrayEquals(serde.toBytes(testBytes), testBytes)
-    assertArrayEquals(serde.fromBytes(testBytes), testBytes)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
deleted file mode 100644
index 60241b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDoubleSerde {
-  @Test
-  def testDoubleSerde {
-    val serde = new DoubleSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 9.156013e-002
-    val fooBarBytes = serde.toBytes(fooBar)
-    fooBarBytes.foreach(System.err.println)
-    assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
deleted file mode 100644
index a3e7e1f..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestIntegerSerde {
-  @Test
-  def testIntegerSerde {
-    val serde = new IntegerSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 37
-    val fooBarBytes = serde.toBytes(fooBar)
-    assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
deleted file mode 100644
index 77a7498..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestLongSerde {
-  @Test
-  def testLongSerde {
-    val serde = new LongSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 1234123412341234L
-    val fooBarBytes = serde.toBytes(fooBar)
-    fooBarBytes.foreach(System.err.println)
-    assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
deleted file mode 100644
index 8e899d0..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSerializableSerde {
-  @Test
-  def testSerializableSerde {
-    val serde = new SerializableSerde[String]
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-    
-    val obj = "String is serializable"
-
-    // Serialized string is prefix + string itself
-    val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
-    val expected = (prefix ++ obj.getBytes("UTF-8"))
-    
-    val bytes = serde.toBytes(obj)
-
-    assertArrayEquals(expected, bytes)
-
-    val objRoundTrip:String = serde.fromBytes(bytes)
-    assertEquals(obj, objRoundTrip)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
deleted file mode 100644
index a1e8e88..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestStringSerde {
-  @Test
-  def testStringSerde {
-    val serde = new StringSerde("UTF-8")
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = "foo bar"
-    val fooBarBytes = serde.toBytes(fooBar)
-    assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
deleted file mode 100644
index 04ddcdb..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.BufferUnderflowException
-import java.util.UUID
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestUUIDSerde {
-  private val serde = new UUIDSerde
-
-  @Test
-  def testUUIDSerde {
-    val uuid = new UUID(13, 42)
-    val bytes = serde.toBytes(uuid)
-    assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
-    assertEquals(uuid, serde.fromBytes(bytes))
-  }
-
-  @Test
-  def testToBytesWhenNull {
-    assertEquals(null, serde.toBytes(null))
-  }
-
-  @Test
-  def testFromBytesWhenNull {
-    assertEquals(null, serde.fromBytes(null))
-  }
-
-  @Test(expected = classOf[BufferUnderflowException])
-  def testFromBytesWhenInvalid {
-    assertEquals(null, serde.fromBytes(Array[Byte](0)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 0f0d792..5824489 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -77,7 +77,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
    *         supplied serde name.
    */
   public String getSerdeClass(String name) {
-    return get(String.format(SerializerConfig.SERDE(), name), null);
+    return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null);
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index e442599..8436835 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -246,7 +246,7 @@ public class StreamAppender extends AppenderSkeleton {
       SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
-      String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
+      String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
       throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
           serdeKey + " property");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml
index 7b4fb82..ab74ebd 100644
--- a/samza-test/src/main/resources/log4j.xml
+++ b/samza-test/src/main/resources/log4j.xml
@@ -12,43 +12,39 @@
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
-    <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
-    <param name="MaxFileSize" value="256MB" />
-    <param name="MaxBackupIndex" value="20" />
-    <layout class="org.apache.log4j.PatternLayout">
-        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
-    </layout>
-  </appender>
-
-  <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
-    <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
-    <param name="MaxFileSize" value="256MB" />
-    <param name="MaxBackupIndex" value="1" />
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
-    </layout>
-  </appender>
-
   <appender name="console" class="org.apache.log4j.ConsoleAppender">
     <param name="Target" value="System.out" />
     <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+      <param name="ConversionPattern" value="[%t] %c{1} [%p] %m%n" />
     </layout>
   </appender>
 
   <root>
     <priority value="info" />
-    <appender-ref ref="console" /> 
-    <appender-ref ref="RollingAppender" />
+    <appender-ref ref="console" />
   </root>
 
   <logger name="STARTUP_LOGGER" additivity="false">
     <level value="info" />
-    <appender-ref ref="StartupAppender"/>
+    <appender-ref ref="console"/>
   </logger>
 
   <logger name="org.apache.hadoop">
-    <level value="off"/>
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.I0Itec.zkclient">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.zookeeper">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.samza.system.kafka">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.kafka">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="kafka">
+    <level value="ERROR"/>
   </logger>
-</log4j:configuration>
+</log4j:configuration>
\ No newline at end of file


[3/4] samza git commit: SAMZA-1109; Allow setting Serdes for fluent API operators in code

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index 0545af1..9cc5370 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
@@ -35,13 +35,12 @@ import java.util.Collections;
  *
  * @param <K> the type of key in the incoming message
  * @param <V> the type of message in the incoming message
- * @param <M> the type of input message
  */
-public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M> {
+public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Object> { // Object == KV<K,V> | V
 
-  private final InputOperatorSpec<K, V, M> inputOpSpec;
+  private final InputOperatorSpec<K, V> inputOpSpec;
 
-  InputOperatorImpl(InputOperatorSpec<K, V, M> inputOpSpec) {
+  InputOperatorImpl(InputOperatorSpec<K, V> inputOpSpec) {
     this.inputOpSpec = inputOpSpec;
   }
 
@@ -50,9 +49,8 @@ public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M
   }
 
   @Override
-  public Collection<M> handleMessage(Pair<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
-    // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
-    M message = this.inputOpSpec.getMsgBuilder().apply(pair.getKey(), pair.getValue());
+  public Collection<Object> handleMessage(KV<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
+    Object message = this.inputOpSpec.isKeyedInput() ? pair : pair.getValue();
     return Collections.singletonList(message);
   }
 
@@ -60,7 +58,7 @@ public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M
   protected void handleClose() {
   }
 
-  protected OperatorSpec<Pair<K, V>, M> getOperatorSpec() {
+  protected OperatorSpec<KV<K, V>, Object> getOperatorSpec() {
     return this.inputOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 8dd5acd..e353ac4 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.Counter;
@@ -37,6 +38,9 @@ import java.util.Set;
 
 /**
  * Abstract base class for all stream operator implementations.
+ *
+ * @param <M> type of the input to this operator
+ * @param <RM> type of the results of applying this operator
  */
 public abstract class OperatorImpl<M, RM> {
   private static final String METRICS_GROUP = OperatorImpl.class.getName();
@@ -113,7 +117,19 @@ public abstract class OperatorImpl<M, RM> {
   public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
     this.numMessage.inc();
     long startNs = this.highResClock.nanoTime();
-    Collection<RM> results = handleMessage(message, collector, coordinator);
+    Collection<RM> results;
+    try {
+      results = handleMessage(message, collector, coordinator);
+    } catch (ClassCastException e) {
+      String actualType = e.getMessage().replaceFirst(" cannot be cast to .*", "");
+      String expectedType = e.getMessage().replaceFirst(".* cannot be cast to ", "");
+      throw new SamzaException(
+          String.format("Error applying operator %s (created at %s) to its input message. "
+                  + "Expected input message to be of type %s, but found it to be of type %s. "
+                  + "Are Serdes for the inputs to this operator configured correctly?",
+              getOperatorName(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
+    }
+
     long endNs = this.highResClock.nanoTime();
     this.handleMessageNs.update(endNs - startNs);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 99496eb..faedfc9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -19,8 +19,8 @@
 package org.apache.samza.operators.impl;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -28,8 +28,9 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -69,7 +70,7 @@ public class OperatorImplGraph {
    * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
    * reached from different {@link OperatorSpec} during DAG traversals.
    */
-  private final Map<Integer, Pair<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
+  private final Map<Integer, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
 
   private final Clock clock;
 
@@ -167,6 +168,8 @@ public class OperatorImplGraph {
       return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof OutputOperatorSpec) {
       return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, context);
+    } else if (operatorSpec instanceof PartitionByOperatorSpec) {
+      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
@@ -178,19 +181,19 @@ public class OperatorImplGraph {
 
   private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec,
       JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) {
-    Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
+    KV<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
     if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join
       return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
-          partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock);
+          partialJoinFunctions.getKey(), partialJoinFunctions.getValue(), config, context, clock);
     } else { // we got here from the right side of the join
       return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false,
-          partialJoinFunctions.getRight(), partialJoinFunctions.getLeft(), config, context, clock);
+          partialJoinFunctions.getValue(), partialJoinFunctions.getKey(), config, context, clock);
     }
   }
 
-  private Pair<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
+  private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
     return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
-        joinOpId -> Pair.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
   }
 
   private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index fe59b74..7b7e49c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -38,11 +39,14 @@ import java.util.Collections;
 class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
 
   private final OutputOperatorSpec<M> outputOpSpec;
-  private final OutputStreamImpl<?, ?, M> outputStream;
+  private final OutputStreamImpl<M> outputStream;
+  private final SystemStream systemStream;
 
   OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
+    this.systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+        outputStream.getStreamSpec().getPhysicalName());
   }
 
   @Override
@@ -52,12 +56,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   @Override
   public Collection<Void> handleMessage(M message, MessageCollector collector,
       TaskCoordinator coordinator) {
-    // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
-    SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
-        outputStream.getStreamSpec().getPhysicalName());
-    Object key = outputStream.getKeyExtractor().apply(message);
-    Object msg = outputStream.getMsgExtractor().apply(message);
-    collector.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+    Object key, value;
+    if (outputStream.isKeyedOutput()) {
+      key = ((KV) message).getKey();
+      value = ((KV) message).getValue();
+    } else {
+      key = null;
+      value = message;
+    }
+
+    collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
     return Collections.emptyList();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
new file mode 100644
index 0000000..072b31d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+
+/**
+ * An operator that sends sends messages to an output {@link SystemStream} for repartitioning them.
+ */
+class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
+
+  private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
+  private final SystemStream systemStream;
+  private final Function<? super M, ? extends K> keyFunction;
+  private final Function<? super M, ? extends V> valueFunction;
+
+  PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) {
+    this.partitionByOpSpec = partitionByOpSpec;
+    OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream();
+    if (!outputStream.isKeyedOutput()) {
+      throw new SamzaException("Output stream for repartitioning must be a keyed stream.");
+    }
+    this.systemStream = new SystemStream(
+        outputStream.getStreamSpec().getSystemName(),
+        outputStream.getStreamSpec().getPhysicalName());
+    this.keyFunction = partitionByOpSpec.getKeyFunction();
+    this.valueFunction = partitionByOpSpec.getValueFunction();
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  public Collection<Void> handleMessage(M message, MessageCollector collector,
+      TaskCoordinator coordinator) {
+    K key = keyFunction.apply(message);
+    V value = valueFunction.apply(message);
+    collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+  }
+
+  @Override
+  protected OperatorSpec<M, Void> getOperatorSpec() {
+    return partitionByOpSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index f9485f7..736d71e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -137,6 +137,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
 
   @Override
   public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    LOG.trace("Processing timer.");
     List<WindowPane<WK, WV>> results = new ArrayList<>();
 
     List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
@@ -148,7 +149,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
         maybeTriggeredPane.ifPresent(results::add);
       }
     }
-
+    LOG.trace("Triggered panes: " + results.size());
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 6fbc3c1..2749245 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -18,35 +18,46 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 
-import java.util.function.BiFunction;
-
 /**
  * The spec for an operator that receives incoming messages from an input stream
  * and converts them to the input message.
  *
- * @param <K> the type of key in the incoming message
- * @param <V> the type of message in the incoming message
- * @param <M> the type of input message
+ * @param <K> the type of input key
+ * @param <V> the type of input value
  */
-public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
+public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V
 
   private final StreamSpec streamSpec;
-  private final BiFunction<K, V, M> msgBuilder;
+  private final Serde<K> keySerde;
+  private final Serde<V> valueSerde;
+  private final boolean isKeyedInput;
 
-  public InputOperatorSpec(StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder, int opId) {
+  public InputOperatorSpec(StreamSpec streamSpec,
+      Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, int opId) {
     super(OpCode.INPUT, opId);
     this.streamSpec = streamSpec;
-    this.msgBuilder = msgBuilder;
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+    this.isKeyedInput = isKeyedInput;
   }
 
   public StreamSpec getStreamSpec() {
     return this.streamSpec;
   }
 
-  public BiFunction<K, V, M> getMsgBuilder() {
-    return this.msgBuilder;
+  public Serde<K> getKeySerde() {
+    return keySerde;
+  }
+
+  public Serde<V> getValueSerde() {
+    return valueSerde;
+  }
+
+  public boolean isKeyedInput() {
+    return isKeyedInput;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index f64e123..bcb0485 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,6 +19,8 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
 
 import java.util.Collection;
 import java.util.LinkedHashSet;
@@ -105,9 +107,23 @@ public abstract class OperatorSpec<M, OM> {
     // [2] SomeOperatorSpec.<init>()
     // [3] OperatorSpecs.createSomeOperatorSpec()
     // [4] MessageStreamImpl.someOperator()
-    // [5] User code that calls [4]
-    // we are interested in [5] here
+    // [5] User/MessageStreamImpl code that calls [4]
+    // We are interested in the first call below this that originates from user code
     StackTraceElement element = this.creationStackTrace[5];
+
+    /**
+     * Sometimes [5] above is a call from MessageStream/MessageStreamImpl itself (e.g. for
+     * {@link org.apache.samza.operators.MessageStream#mergeAll(Collection)} or
+     * {@link MessageStreamImpl#partitionBy(Function, Function)}).
+     * If that's the case, find the first call from a class other than these.
+     */
+    for (int i = 5; i < creationStackTrace.length; i++) {
+      if (!creationStackTrace[i].getClassName().equals(MessageStreamImpl.class.getName())
+          && !creationStackTrace[i].getClassName().equals(MessageStream.class.getName())) {
+        element = creationStackTrace[i];
+        break;
+      }
+    }
     return String.format("%s:%s", element.getFileName(), element.getLineNumber());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index ed5fc8f..e67179e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -20,6 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -30,6 +31,7 @@ import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.function.Function;
 
 
 /**
@@ -140,29 +142,29 @@ public class OperatorSpecs {
    *
    * @param outputStream  the {@link OutputStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
-   * @param <K> the type of key in the outgoing message
-   * @param <V> the type of message in the outgoing message
    * @param <M> the type of message in the {@link OutputStreamImpl}
    * @return  the {@link OutputOperatorSpec} for the sendTo operator
    */
-  public static <K, V, M> OutputOperatorSpec<M> createSendToOperatorSpec(
-      OutputStreamImpl<K, V, M> outputStream, int opId) {
-    return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
+  public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+    return new OutputOperatorSpec<>(outputStream, opId);
   }
 
   /**
-   * Creates a {@link OutputOperatorSpec} for the partitionBy operator.
+   * Creates a {@link PartitionByOperatorSpec} for the partitionBy operator.
    *
+   * @param <M> the type of messages being repartitioned
+   * @param <K> the type of key in the repartitioned {@link OutputStreamImpl}
+   * @param <V> the type of value in the repartitioned {@link OutputStreamImpl}
    * @param outputStream  the {@link OutputStreamImpl} to send messages to
+   * @param keyFunction  the {@link MapFunction} for extracting the key from the message
+   * @param valueFunction  the {@link MapFunction} for extracting the value from the message
    * @param opId  the unique ID of the operator
-   * @param <K> the type of key in the outgoing message
-   * @param <V> the type of message in the outgoing message
-   * @param <M> the type of message in the {@link OutputStreamImpl}
    * @return  the {@link OutputOperatorSpec} for the partitionBy operator
    */
-  public static <K, V, M> OutputOperatorSpec<M> createPartitionByOperatorSpec(
-      OutputStreamImpl<K, V, M> outputStream, int opId) {
-    return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
+  public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec(
+      OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction,
+      Function<? super M, ? extends V> valueFunction, int opId) {
+    return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index e6767ec..fc88634 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -29,19 +29,16 @@ package org.apache.samza.operators.spec;
  */
 public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
 
-  private OutputStreamImpl<?, ?, M> outputStream;
-
+  private final OutputStreamImpl<M> outputStream;
 
   /**
    * Constructs an {@link OutputOperatorSpec} to send messages to the provided {@code outStream}
    *
    * @param outputStream  the {@link OutputStreamImpl} to send messages to
-   * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
-   *               It could be {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
    * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
    */
-  OutputOperatorSpec(OutputStreamImpl<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
-    super(opCode, opId);
+  OutputOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+    super(OpCode.SEND_TO, opId);
     this.outputStream = outputStream;
   }
 
@@ -49,7 +46,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
    * The {@link OutputStreamImpl} that this operator is sending its output to.
    * @return the {@link OutputStreamImpl} for this operator if any, else null.
    */
-  public OutputStreamImpl<?, ?, M> getOutputStream() {
+  public OutputStreamImpl<M> getOutputStream() {
     return this.outputStream;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
index 5506378..a793e0c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -19,32 +19,38 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 
-import java.util.function.Function;
 
-public class OutputStreamImpl<K, V, M> implements OutputStream<K, V, M> {
+public class OutputStreamImpl<M> implements OutputStream<M> {
 
   private final StreamSpec streamSpec;
-  private final Function<M, K> keyExtractor;
-  private final Function<M, V> msgExtractor;
+  private final Serde keySerde;
+  private final Serde valueSerde;
+  private final boolean isKeyedOutput;
 
   public OutputStreamImpl(StreamSpec streamSpec,
-      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+      Serde keySerde, Serde valueSerde, boolean isKeyedOutput) {
     this.streamSpec = streamSpec;
-    this.keyExtractor = keyExtractor;
-    this.msgExtractor = msgExtractor;
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+    this.isKeyedOutput = isKeyedOutput;
   }
 
   public StreamSpec getStreamSpec() {
     return streamSpec;
   }
 
-  public Function<M, K> getKeyExtractor() {
-    return keyExtractor;
+  public Serde getKeySerde() {
+    return keySerde;
   }
 
-  public Function<M, V> getMsgExtractor() {
-    return msgExtractor;
+  public Serde getValueSerde() {
+    return valueSerde;
+  }
+
+  public boolean isKeyedOutput() {
+    return isKeyedOutput;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
new file mode 100644
index 0000000..a2bb5f2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.KV;
+
+import java.util.function.Function;
+
+
+/**
+ * The spec for an operator that re-partitions a {@link org.apache.samza.operators.MessageStream} to a
+ * {@link org.apache.samza.system.SystemStream}. This is usually paired with a corresponding
+ * {@link InputOperatorSpec} that consumes the {@link org.apache.samza.system.SystemStream} again.
+ * <p>
+ * This is a terminal operator and does not allow further operator chaining.
+ *
+ * @param <M> the type of message
+ * @param <K> the type of key in the message
+ * @param <V> the type of value in the message
+ */
+public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
+
+  private final OutputStreamImpl<KV<K, V>> outputStream;
+  private final Function<? super M, ? extends K> keyFunction;
+  private final Function<? super M, ? extends V> valueFunction;
+
+  /**
+   * Constructs an {@link PartitionByOperatorSpec} to send messages to the provided {@code outputStream}
+   *
+   * @param outputStream the {@link OutputStreamImpl} to send messages to
+   * @param keyFunction the {@link Function} for extracting the key from the message
+   * @param valueFunction the {@link Function} for extracting the value from the message
+   * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
+   */
+  PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
+      Function<? super M, ? extends K> keyFunction,
+      Function<? super M, ? extends V> valueFunction, int opId) {
+    super(OpCode.PARTITION_BY, opId);
+    this.outputStream = outputStream;
+    this.keyFunction = keyFunction;
+    this.valueFunction = valueFunction;
+  }
+
+  /**
+   * The {@link OutputStreamImpl} that this operator is sending its output to.
+   * @return the {@link OutputStreamImpl} for this operator if any, else null.
+   */
+  public OutputStreamImpl<KV<K, V>> getOutputStream() {
+    return this.outputStream;
+  }
+
+  public Function<? super M, ? extends K> getKeyFunction() {
+    return keyFunction;
+  }
+
+  public Function<? super M, ? extends V> getValueFunction() {
+    return valueFunction;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index f0bb1dc..279cdd4 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -22,6 +22,7 @@ import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.system.StreamSpec;
 
@@ -30,21 +31,19 @@ import org.apache.samza.system.StreamSpec;
  * <p>
  * This implementation accepts a pair of {@link InputOperatorSpec} and {@link OutputStreamImpl} associated
  * with the same logical {@code streamId}. It provides access to its {@link OutputStreamImpl} for
- * {@link MessageStreamImpl#partitionBy} to send messages out to. It's also a {@link MessageStreamImpl} with
+ * the partitionBy operator to send messages out to. It's also a {@link MessageStreamImpl} with
  * {@link InputOperatorSpec} as its operator spec, so that further operations can be chained on the
  * {@link InputOperatorSpec}.
  *
- * @param <K> the type of key in the outgoing/incoming message
- * @param <V> the type of message in the outgoing/incoming message
  * @param <M> the type of message in the output {@link MessageStreamImpl}
  */
-public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStream<K, V, M> {
+public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> implements OutputStream<M> {
 
-  private final OutputStreamImpl<K, V, M> outputStream;
+  private final OutputStreamImpl<M> outputStream;
 
-  public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<K, V, M> inputOperatorSpec,
-      OutputStreamImpl<K, V, M> outputStream) {
-    super(graph, inputOperatorSpec);
+  public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<?, M> inputOperatorSpec,
+      OutputStreamImpl<M> outputStream) {
+    super(graph, (OperatorSpec<?, M>) inputOperatorSpec);
     this.outputStream = outputStream;
   }
 
@@ -52,7 +51,7 @@ public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M>
     return this.outputStream.getStreamSpec();
   }
 
-  public OutputStreamImpl<K, V, M> getOutputStream() {
+  public OutputStreamImpl<M> getOutputStream() {
     return this.outputStream;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 26ef92c..45ce9aa 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -20,11 +20,13 @@
 package org.apache.samza.serializers;
 
 import java.util.Arrays;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.message.EndOfStreamMessage;
 import org.apache.samza.message.MessageType;
 import org.apache.samza.message.WatermarkMessage;
-import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -49,28 +51,7 @@ import org.codehaus.jackson.type.TypeReference;
  * For control message, we use json serde.
  */
 public class IntermediateMessageSerde implements Serde<Object> {
-
-  private static final class WatermarkSerde extends JsonSerde<WatermarkMessage> {
-    @Override
-    public WatermarkMessage fromBytes(byte[] bytes) {
-      try {
-        return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<WatermarkMessage>() { });
-      } catch (Exception e) {
-        throw new SamzaException(e);
-      }
-    }
-  }
-
-  private static final class EndOfStreamSerde extends JsonSerde<EndOfStreamMessage> {
-    @Override
-    public EndOfStreamMessage fromBytes(byte[] bytes) {
-      try {
-        return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<EndOfStreamMessage>() { });
-      } catch (Exception e) {
-        throw new SamzaException(e);
-      }
-    }
-  }
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateMessageSerde.class);
 
   private final Serde userMessageSerde;
   private final Serde<WatermarkMessage> watermarkSerde;
@@ -78,8 +59,8 @@ public class IntermediateMessageSerde implements Serde<Object> {
 
   public IntermediateMessageSerde(Serde userMessageSerde) {
     this.userMessageSerde = userMessageSerde;
-    this.watermarkSerde = new WatermarkSerde();
-    this.eosSerde = new EndOfStreamSerde();
+    this.watermarkSerde = new JsonSerdeV2<>(WatermarkMessage.class);
+    this.eosSerde = new JsonSerdeV2<>(EndOfStreamMessage.class);
   }
 
   @Override
@@ -110,7 +91,13 @@ public class IntermediateMessageSerde implements Serde<Object> {
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
       // For these cases, we fall back to user-provided serde
-      return userMessageSerde.fromBytes(bytes);
+      try {
+        return userMessageSerde.fromBytes(bytes);
+      } catch (Exception umse) {
+        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
+            + "Original exception: ", e);
+        throw umse;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index a77ef3b..d7c2742 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.task;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
@@ -105,7 +105,7 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
     InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
     if (inputOpImpl != null) {
-      inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+      inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
index 020e3bc..764f77a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
@@ -23,13 +23,15 @@ import scala.collection.JavaConverters._
 object SerializerConfig {
   // serializer config constants
   val SERIALIZER_PREFIX = "serializers.registry.%s"
-  val SERDE = "serializers.registry.%s.class"
+  val SERDE_FACTORY_CLASS = "serializers.registry.%s.class"
+  val SERIALIZED_INSTANCE_SUFFIX = ".samza.serialized.instance"
+  val SERDE_SERIALIZED_INSTANCE = SERIALIZER_PREFIX + SERIALIZED_INSTANCE_SUFFIX
 
   implicit def Config2Serializer(config: Config) = new SerializerConfig(config)
 }
 
 class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
-  def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE format name)
+  def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE_FACTORY_CLASS format name)
 
   /**
    * Returns a list of all serializer names from the config file. Useful for

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 628d7f6..45e8e10 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -24,9 +24,8 @@ import java.nio.file.Path
 import java.util
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.net.{URL, UnknownHostException}
+import java.util.Base64
 
-import org.apache.samza.serializers.IntermediateMessageSerde
-import org.apache.samza.serializers.StringSerde
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -50,8 +49,13 @@ import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.serializers.IntermediateMessageSerde
+import org.apache.samza.serializers.NoOpSerde
+import org.apache.samza.serializers.SerializableSerde
+import org.apache.samza.serializers.Serde
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.serializers.StringSerde
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.storage.TaskStorageManager
@@ -168,10 +172,6 @@ object SamzaContainer extends Logging {
 
     info("Got serde streams: %s" format serdeStreams)
 
-    val serdeNames = config.getSerdeNames
-
-    info("Got serde names: %s" format serdeNames)
-
     val systemFactories = systemNames.map(systemName => {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
@@ -222,7 +222,7 @@ object SamzaContainer extends Logging {
 
     info("Got system producers: %s" format producers.keys)
 
-    val serdes = serdeNames.map(serdeName => {
+    val serdesFromFactories = config.getSerdeNames.map(serdeName => {
       val serdeClassName = config
         .getSerdeClass(serdeName)
         .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
@@ -232,8 +232,28 @@ object SamzaContainer extends Logging {
 
       (serdeName, serde)
     }).toMap
+    info("Got serdes from factories: %s" format serdesFromFactories.keys)
+
+    val serializableSerde = new SerializableSerde[Serde[Object]]()
+    val serdesFromSerializedInstances = config.subset(SerializerConfig.SERIALIZER_PREFIX format "").asScala
+        .filter { case (key, value) => key.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX) }
+        .flatMap { case (key, value) =>
+          val serdeName = key.replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX, "")
+          debug(s"Trying to deserialize serde instance for $serdeName")
+          try {
+            val bytes = Base64.getDecoder.decode(value)
+            val serdeInstance = serializableSerde.fromBytes(bytes)
+            debug(s"Returning serialized instance for $serdeName")
+            Some((serdeName, serdeInstance))
+          } catch {
+            case e: Exception =>
+              warn(s"Ignoring invalid serialized instance for $serdeName: $value", e)
+              None
+          }
+        }
+    info("Got serdes from serialized instances: %s" format serdesFromSerializedInstances.keys)
 
-    info("Got serdes: %s" format serdes.keys)
+    val serdes = serdesFromFactories ++ serdesFromSerializedInstances
 
     /*
      * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined
@@ -242,11 +262,16 @@ object SamzaContainer extends Logging {
     val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
       systemNames
         .filter(systemName => getSerdeName(systemName).isDefined)
-              .map(systemName => {
+        .flatMap(systemName => {
           val serdeName = getSerdeName(systemName).get
           val serde = serdes.getOrElse(serdeName,
             throw new SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format serdeName))
-          (systemName, serde)
+
+          // this shouldn't happen since system level serdes can't be set programmatically using the high level
+          // API, but adding this for safety.
+          Option(serde)
+            .filter(!_.isInstanceOf[NoOpSerde[Any]])
+            .map(serde => (systemName, serde))
         }).toMap
     }
 
@@ -257,11 +282,15 @@ object SamzaContainer extends Logging {
     val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
       (serdeStreams ++ inputSystemStreamPartitions)
         .filter(systemStream => getSerdeName(systemStream).isDefined)
-        .map(systemStream => {
+        .flatMap(systemStream => {
           val serdeName = getSerdeName(systemStream).get
           val serde = serdes.getOrElse(serdeName,
-            throw new SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." format serdeName))
-          (systemStream, serde)
+            throw new SamzaException("buildSystemStreamSerdeMap: No serde found for name: %s." format serdeName))
+
+          // respect explicitly set no-op serdes in high level API
+          Option(serde)
+            .filter(!_.isInstanceOf[NoOpSerde[Any]])
+            .map(serde => (systemStream, serde))
         }).toMap
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
deleted file mode 100644
index adb8781..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-import java.nio.ByteBuffer
-
-/**
- * A serializer for ByteBuffers.
- */
-class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
-  def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde
-}
-
-class ByteBufferSerde extends Serde[ByteBuffer] {
-  def toBytes(byteBuffer: ByteBuffer) = {
-    if (byteBuffer != null) {
-      val bytes = new Array[Byte](byteBuffer.remaining())
-      byteBuffer.duplicate().get(bytes)
-      bytes
-    } else {
-      null
-    }
-  }
-
-  def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
-    ByteBuffer.wrap(bytes)
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
deleted file mode 100644
index 968da26..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for bytes that is effectively a no-op but can be useful for 
- * binary messages.
- */
-class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
-  def getSerde(name: String, config: Config): Serde[Array[Byte]] = new ByteSerde
-}
-
-class ByteSerde extends Serde[Array[Byte]] {
-  def toBytes(bytes: Array[Byte]) = bytes
-
-  def fromBytes(bytes: Array[Byte]) = bytes
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
deleted file mode 100644
index 7981d2c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for doubles
- */
-class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde
-}
-
-class DoubleSerde extends Serde[java.lang.Double] {
-  def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getDouble
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
deleted file mode 100644
index 46509f7..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for integers
- */
-class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde
-}
-
-class IntegerSerde extends Serde[java.lang.Integer] {
-  def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(4).putInt(obj.intValue).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getInt
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 4b658c1..5ea0f0d 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -20,31 +20,45 @@
 package org.apache.samza.serializers
 
 import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-import org.apache.samza.config.Config
+import org.slf4j.LoggerFactory
+
 
+/**
+  * A serializer for JSON strings that
+  * <ol>
+  *   <li>
+  *     returns a LinkedHashMap<String, Object> upon deserialization.
+  *   <li>
+  *     enforces the 'dash-separated' property naming convention.
+  * </ol>
+  * JsonSerdeV2 should be preferred over JsonSerde unless JsonSerde was already being used and backwards
+  * compatibility for data with the dasherized name format is required.
+  */
 class JsonSerde[T] extends Serde[T] {
-  val mapper = SamzaObjectMapper.getObjectMapper()
+  private val LOG = LoggerFactory.getLogger(classOf[JsonSerde[T]])
+  @transient lazy private val mapper = SamzaObjectMapper.getObjectMapper
 
   def toBytes(obj: T): Array[Byte] = {
     try {
       mapper.writeValueAsString(obj).getBytes("UTF-8")
-    }
-    catch {
+    } catch {
       case e: Exception => throw new SamzaException(e);
     }
   }
 
   def fromBytes(bytes: Array[Byte]): T = {
+    val str = new String(bytes, "UTF-8")
      try {
-         mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
-     catch {
-       case e: Exception => throw new SamzaException(e);
+       mapper.readValue(str, new TypeReference[T]() {})
+     } catch {
+       case e: Exception =>
+         LOG.debug(s"Error deserializing message: $str", e)
+         throw new SamzaException(e)
      }
   }
-
 }
 
 class JsonSerdeFactory extends SerdeFactory[Object] {

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
deleted file mode 100644
index 41ff598..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for longs
- */
-class LongSerdeFactory extends SerdeFactory[java.lang.Long] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Long] = new LongSerde
-}
-
-class LongSerde extends Serde[java.lang.Long] {
-  def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(8).putLong(obj.longValue()).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getLong
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
index 455dd34..8fe3c37 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
@@ -20,12 +20,10 @@
 package org.apache.samza.serializers
 import org.apache.samza.config.Config
 import org.codehaus.jackson.map.ObjectMapper
-import java.util.Map
-import java.nio.ByteBuffer
 import org.apache.samza.metrics.reporter.MetricsSnapshot
 
 class MetricsSnapshotSerde extends Serde[MetricsSnapshot] {
-  val jsonMapper = new ObjectMapper
+  @transient lazy val jsonMapper = new ObjectMapper
 
   def toBytes(obj: MetricsSnapshot) = jsonMapper
     .writeValueAsString(obj.getAsMap)

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
deleted file mode 100644
index c43f863..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for Serializable
- */
-class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
-  def getSerde(name: String, config: Config): Serde[T] =
-    new SerializableSerde[T]
-}
-
-class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
-  def toBytes(obj: T): Array[Byte] = if (obj != null) {
-    val bos = new ByteArrayOutputStream
-    val oos = new ObjectOutputStream(bos)
-
-    try {
-      oos.writeObject(obj)
-    }
-    finally {
-      oos.close()
-    }
-
-    bos.toByteArray
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis)
-
-    try {
-      ois.readObject.asInstanceOf[T]
-    }
-    finally{
-      ois.close()
-    }
-  } else {
-    null.asInstanceOf[T]
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
deleted file mode 100644
index 6e5f260..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for strings
- */
-class StringSerdeFactory extends SerdeFactory[String] {
-  def getSerde(name: String, config: Config): Serde[String] =
-    new StringSerde(config.get("encoding", "UTF-8"))
-}
-
-class StringSerde(val encoding: String) extends Serde[String] {
-  def toBytes(obj: String): Array[Byte] = if (obj != null) {
-    obj.toString.getBytes(encoding)
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): String = if (bytes != null) {
-    new String(bytes, 0, bytes.size, encoding)
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
deleted file mode 100644
index 88d4327..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import java.util.UUID
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for UUID
- */
-class UUIDSerdeFactory extends SerdeFactory[UUID] {
-  def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde
-}
-
-class UUIDSerde() extends Serde[UUID] {
-  def toBytes(obj: UUID): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) {
-    val buffer = ByteBuffer.wrap(bytes)
-    new UUID(buffer.getLong, buffer.getLong)
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
index 73a89af..7061732 100644
--- a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -21,10 +21,14 @@ package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.util.CommandLine;
 
 
@@ -35,14 +39,12 @@ public class BroadcastExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> inputStream =
-        graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
-    OutputStream<String, PageViewEvent, PageViewEvent> outputStream1 =
-        graph.getOutputStream("outputStream1", m -> m.key, m -> m);
-    OutputStream<String, PageViewEvent, PageViewEvent> outputStream2 =
-        graph.getOutputStream("outputStream2", m -> m.key, m -> m);
-    OutputStream<String, PageViewEvent, PageViewEvent> outputStream3 =
-        graph.getOutputStream("outputStream3", m -> m.key, m -> m);
+    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)));
+
+    MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("inputStream");
+    OutputStream<KV<String, PageViewEvent>> outputStream1 = graph.getOutputStream("outputStream1");
+    OutputStream<KV<String, PageViewEvent>> outputStream2 = graph.getOutputStream("outputStream2");
+    OutputStream<KV<String, PageViewEvent>> outputStream3 = graph.getOutputStream("outputStream3");
 
     inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
     inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 5be3046..c75608f 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -21,11 +21,15 @@ package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
@@ -42,15 +46,19 @@ import java.util.concurrent.TimeUnit;
 public class KeyValueStoreExample implements StreamApplication {
 
   @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(
-        "pageViewEventStream", (k, v) -> (PageViewEvent) v);
-    OutputStream<String, StatsOutput, StatsOutput> pageViewEventPerMemberStream = graph.getOutputStream(
-        "pageViewEventPerMemberStream", statsOutput -> statsOutput.memberId, statsOutput -> statsOutput);
-
-    pageViewEvents.
-        partitionBy(m -> m.memberId).
-        flatMap(new MyStatsCounter()).
-        sendTo(pageViewEventPerMemberStream);
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
+        graph.getOutputStream("pageViewEventPerMember",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
+
+    pageViewEvents
+        .partitionBy(pve -> pve.memberId, pve -> pve,
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+        .map(KV::getValue)
+        .flatMap(new MyStatsCounter())
+        .map(stats -> KV.of(stats.memberId, stats))
+        .sendTo(pageViewEventPerMember);
   }
 
   // local execution mode

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
index 9fbf6d1..4702c9a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
@@ -22,23 +22,31 @@ package org.apache.samza.example;
 import com.google.common.collect.ImmutableList;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.util.CommandLine;
 
 public class MergeExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<Object> inputStream1 = graph.getInputStream("inputStream1", (k, m) -> m);
-    MessageStream<Object> inputStream2 = graph.getInputStream("inputStream2", (k, m) -> m);
-    MessageStream<Object> inputStream3 = graph.getInputStream("inputStream3", (k, m) -> m);
-    OutputStream<Integer, Object, Object> outputStream = graph
-        .getOutputStream("outputStream", Object::hashCode, m -> m);
+    graph.setDefaultSerde(new StringSerde());
 
-    MessageStream.mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
+    MessageStream<String> inputStream1 = graph.getInputStream("inputStream1");
+    MessageStream<String> inputStream2 = graph.getInputStream("inputStream2");
+    MessageStream<String> inputStream3 = graph.getInputStream("inputStream3");
+    OutputStream<KV<Integer, String>> outputStream =
+        graph.getOutputStream("outputStream", KVSerde.of(new IntegerSerde(), new StringSerde()));
+
+    MessageStream
+        .mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
+        .map(m -> KV.of(m.hashCode(), m))
         .sendTo(outputStream);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index f65c4ed..95939c4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,11 +20,15 @@ package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -36,14 +40,18 @@ public class OrderShipmentJoinExample implements StreamApplication {
 
   @Override
   public void init(StreamGraph graph, Config config) {
-    MessageStream<OrderRecord> orders = graph.getInputStream("orderStream", (k, m) -> (OrderRecord) m);
-    MessageStream<ShipmentRecord> shipments = graph.getInputStream("shipmentStream", (k, m) -> (ShipmentRecord) m);
-    OutputStream<String, FulFilledOrderRecord, FulFilledOrderRecord> joinedOrderShipmentStream =
-        graph.getOutputStream("joinedOrderShipmentStream", m -> m.orderId, m -> m);
+    MessageStream<OrderRecord> orders =
+        graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
+    MessageStream<ShipmentRecord> shipments =
+        graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
+    OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
+        graph.getOutputStream("fulfilledOrders",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
     orders
         .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
-        .sendTo(joinedOrderShipmentStream);
+        .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
+        .sendTo(fulfilledOrders);
   }
 
   // local execution mode
@@ -54,10 +62,10 @@ public class OrderShipmentJoinExample implements StreamApplication {
     localRunner.run(new OrderShipmentJoinExample());
   }
 
-  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
     @Override
-    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
-      return new FulFilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+    public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
     }
 
     @Override
@@ -91,12 +99,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
     }
   }
 
-  class FulFilledOrderRecord {
+  class FulfilledOrderRecord {
     String orderId;
     long orderTimeMs;
     long shipTimeMs;
 
-    FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+    FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
       this.orderId = orderId;
       this.orderTimeMs = orderTimeMs;
       this.shipTimeMs = shipTimeMs;

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index a3471a2..91657ed 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -20,6 +20,7 @@ package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
@@ -29,6 +30,9 @@ import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -42,9 +46,10 @@ public class PageViewCounterExample implements StreamApplication {
 
   @Override public void init(StreamGraph graph, Config config) {
     MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
-    OutputStream<String, PageViewCount, PageViewCount> pageViewEventPerMemberStream = graph
-        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+        graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
+        graph.getOutputStream("pageViewEventPerMemberStream",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
 
     Supplier<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
@@ -52,7 +57,7 @@ public class PageViewCounterExample implements StreamApplication {
         .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
             .setAccumulationMode(AccumulationMode.DISCARDING))
-        .map(PageViewCount::new)
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
         .sendTo(pageViewEventPerMemberStream);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 7bf939b..e9bb284 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -20,16 +20,19 @@ package org.apache.samza.example;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
-import java.util.function.Supplier;
 
 
 /**
@@ -38,17 +41,18 @@ import java.util.function.Supplier;
 public class RepartitionExample implements StreamApplication {
 
   @Override public void init(StreamGraph graph, Config config) {
-    Supplier<Integer> initialValue = () -> 0;
     MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
-    OutputStream<String, MyStreamOutput, MyStreamOutput> pageViewEventPerMemberStream = graph
-        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+        graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
+        graph.getOutputStream("pageViewEventPerMember",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
 
     pageViewEvents
-        .partitionBy(m -> m.memberId)
-        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1))
-        .map(MyStreamOutput::new)
-        .sendTo(pageViewEventPerMemberStream);
+        .partitionBy(pve -> pve.memberId, pve -> pve,
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1))
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
+        .sendTo(pageViewEventPerMember);
   }
 
   // local execution mode

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 1fd3be5..08c896c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -29,6 +29,8 @@ import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -45,9 +47,8 @@ public class WindowExample implements StreamApplication {
   public void init(StreamGraph graph, Config config) {
     Supplier<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
-    MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
-    OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph
-        .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage());
+    MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
+    OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde());
 
     // create a tumbling window that outputs the number of message collected every 10 minutes.
     // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
@@ -55,6 +56,7 @@ public class WindowExample implements StreamApplication {
     inputStream
         .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+        .map(WindowPane::getMessage)
         .sendTo(outputStream);
   }