You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/09/28 00:08:09 UTC

[2/4] samza git commit: SAMZA-1109; Allow setting Serdes for fluent API operators in code

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 2c8f682..8c75bca 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -45,8 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -122,11 +121,11 @@ public class TestExecutionPlanner {
      *
      */
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    BiFunction mockBuilder = mock(BiFunction.class);
-    streamGraph.getInputStream("input1", mockBuilder)
-        .partitionBy(m -> "yes!!!").map(m -> m)
+    MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    input1
+        .partitionBy(m -> m.key, m -> m.value)
+        .map(kv -> kv)
         .sendTo(output1);
     return streamGraph;
   }
@@ -145,13 +144,20 @@ public class TestExecutionPlanner {
      */
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
     m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
@@ -162,21 +168,28 @@ public class TestExecutionPlanner {
   private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
 
     m1.map(m -> m)
         .filter(m->true)
-        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
 
     m2.map(m -> m)
         .filter(m->true)
-        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
     m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 4bda86b..095e407 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -19,25 +19,26 @@
 
 package org.apache.samza.execution;
 
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -105,13 +106,21 @@ public class TestJobGraphJsonGenerator {
     StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction mockBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+    streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+    MessageStream<KV<Object, Object>> m1 =
+        streamGraph.<KV<Object, Object>>getInputStream("input1")
+            .map(m -> m);
+    MessageStream<KV<Object, Object>> m2 =
+        streamGraph.<KV<Object, Object>>getInputStream("input2")
+            .partitionBy(m -> m.key, m -> m.value)
+            .filter(m -> true);
+    MessageStream<KV<Object, Object>> m3 =
+        streamGraph.<KV<Object, Object>>getInputStream("input3")
+            .filter(m -> true)
+            .partitionBy(m -> m.key, m -> m.value)
+            .map(m -> m);
+    OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
     m2.sink((message, collector, coordinator) -> { });

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
new file mode 100644
index 0000000..c59c0cc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestJobNode {
+
+  @Test
+  public void testAddSerdeConfigs() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+    StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
+    StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
+    doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+    doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+    MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+    OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
+    input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+
+    JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+    StreamEdge inputEdge = new StreamEdge(inputSpec);
+    StreamEdge outputEdge = new StreamEdge(outputSpec);
+    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
+    jobNode.addInEdge(inputEdge);
+    jobNode.addOutEdge(outputEdge);
+    jobNode.addInEdge(repartitionEdge);
+    jobNode.addOutEdge(repartitionEdge);
+
+    Map<String, String> configs = new HashMap<>();
+    jobNode.addSerdeConfigs(configs);
+
+    MapConfig mapConfig = new MapConfig(configs);
+    Config serializers = mapConfig.subset("serializers.registry.", true);
+
+    // make sure that the serializers deserialize correctly
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap(
+        e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+        e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+    ));
+    assertEquals(2, serializers.size());
+
+    String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+    String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+    assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+    assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+    String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+    assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+    assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+    String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+    String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+    assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+    assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+    assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+    assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..004c5cf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -276,10 +278,11 @@ public class TestJoinOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
       MessageStream<FirstStreamIME> inStream =
-          graph.getInputStream("instream", FirstStreamIME::new);
+          graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new);
       MessageStream<SecondStreamIME> inStream2 =
-          graph.getInputStream("instream2", SecondStreamIME::new);
+          graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new);
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
@@ -330,14 +333,24 @@ public class TestJoinOperator {
   }
 
   private static class FirstStreamIME extends IncomingMessageEnvelope {
+    FirstStreamIME(KV<Integer, Integer> message) {
+      super(new SystemStreamPartition(
+          "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue());
+    }
+
     FirstStreamIME(Integer key, Integer message) {
-      super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, message);
+      this(KV.of(key, message));
     }
   }
 
   private static class SecondStreamIME extends IncomingMessageEnvelope {
+    SecondStreamIME(KV<Integer, Integer> message) {
+      super(new SystemStreamPartition(
+          "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue());
+    }
+
     SecondStreamIME(Integer key, Integer message) {
-      super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, message);
+      this(KV.of(key, message));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 61224f2..c6554bc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
@@ -39,20 +40,19 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.KVSerde;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -172,9 +172,8 @@ public class TestMessageStreamImpl {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-
-    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
-    inputStream.sendTo(mockOutputOpSpec);
+    OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    inputStream.sendTo(mockOutputStreamImpl);
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -182,33 +181,75 @@ public class TestMessageStreamImpl {
 
     assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
     assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
-    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockOutputStreamImpl, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+
+    // same behavior as above so nothing new to assert. but ensures that this variant compiles.
+    MessageStreamImpl<KV<String, TestMessageEnvelope>> keyedInputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    OutputStreamImpl<KV<String, TestMessageEnvelope>> mockKeyedOutputStreamImpl = mock(OutputStreamImpl.class);
+    keyedInputStream.sendTo(mockKeyedOutputStreamImpl);
+
+    // can't unit test it, but the following variants should not compile
+//    inputStream.sendTo(mockKeyedOutputStreamImpl);
+//    keyedInputStream.sendTo(mockOutputStreamImpl);
   }
 
   @Test
-  public void testPartitionBy() {
+  public void testRepartition() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
 
     String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
-    Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
-    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    KVSerde mockKVSerde = mock(KVSerde.class);
     IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
-    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde)))
         .thenReturn(mockIntermediateStream);
     when(mockIntermediateStream.getOutputStream())
-        .thenReturn(mockOutputOpSpec);
+        .thenReturn(mockOutputStreamImpl);
 
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-    inputStream.partitionBy(mockKeyFn);
+    Function mockKeyFunction = mock(Function.class);
+    Function mockValueFunction = mock(Function.class);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde);
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
     verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
     OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
 
-    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
+    assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+    assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
+  }
+
+  @Test
+  public void testRepartitionWithoutSerde() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+    String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+    IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(null)))
+        .thenReturn(mockIntermediateStream);
+    when(mockIntermediateStream.getOutputStream())
+        .thenReturn(mockOutputStreamImpl);
+
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    Function mockKeyFunction = mock(Function.class);
+    Function mockValueFunction = mock(Function.class);
+    inputStream.partitionBy(mockKeyFunction, mockValueFunction);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
     assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
-    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+    assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+    assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 1fc60bd..45583c2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -27,36 +27,136 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestStreamGraphImpl {
 
   @Test
-  public void testGetInputStream() {
+  public void testGetInputStreamWithValueSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+    Serde mockValueSerde = mock(Serde.class);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
     assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetInputStreamWithNullSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    graph.getInputStream("test-stream-1", null);
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    Serde mockValueSerde = mock(Serde.class);
+    graph.setDefaultSerde(mockValueSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithDefaultDefaultSerde() {
+    // default default serde == user hasn't provided a default serde
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test
@@ -65,15 +165,13 @@ public class TestStreamGraphImpl {
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
     assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
     assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
   }
 
@@ -86,12 +184,12 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
-    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
+    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
 
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
-    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
 
     assertEquals(graph.getInputOperators().size(), 2);
@@ -105,29 +203,149 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getInputStream("test-stream-1", mock(BiFunction.class));
-    graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+    graph.getInputStream("test-stream-1");
+    // should throw exception
+    graph.getInputStream("test-stream-1");
+  }
+
+  @Test
+  public void testGetOutputStreamWithValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    Serde mockValueSerde = mock(Serde.class);
+    OutputStream<TestMessageEnvelope> outputStream =
+        graph.getOutputStream("test-stream-1", mockValueSerde);
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test
+  public void testGetOutputStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetOutputStreamWithNullSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    graph.getOutputStream("test-stream-1", null);
+  }
+
+  @Test
+  public void testGetOutputStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    Serde mockValueSerde = mock(Serde.class);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.setDefaultSerde(mockValueSerde);
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test
+  public void testGetOutputStreamWithDefaultKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
-  public void testGetOutputStream() {
+  public void testGetOutputStreamWithDefaultDefaultSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
 
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingStreams() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getInputStream("test-stream-1");
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingOutputStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
-    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+    graph.getOutputStream("test-stream-1");
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+  }
 
-    OutputStream<String, String, TestMessageEnvelope> outputStream =
-        graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSerdeAfterGettingIntermediateStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
-    assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
-    assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
-    assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getIntermediateStream("test-stream-1", null);
+    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
@@ -136,12 +354,89 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
-    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
+    graph.getOutputStream("test-stream-1");
+    graph.getOutputStream("test-stream-1"); // should throw exception
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    Serde mockValueSerde = mock(Serde.class);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockValueSerde);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithKeyValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockKVSerde);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithDefaultValueSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    Serde mockValueSerde = mock(Serde.class);
+    graph.setDefaultSerde(mockValueSerde);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
   }
 
   @Test
-  public void testGetIntermediateStream() {
+  public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     StreamSpec mockStreamSpec = mock(StreamSpec.class);
@@ -150,19 +445,45 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
-    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
-    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
 
-    IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    graph.setDefaultSerde(mockKVSerde);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+    when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", null);
 
     assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
     assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
     assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
-    assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
-    assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -171,8 +492,8 @@ public class TestStreamGraphImpl {
     when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
     StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
-    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
+    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
   }
 
   @Test
@@ -199,9 +520,9 @@ public class TestStreamGraphImpl {
     StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
     when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
 
-    graph.getInputStream("test-stream-1", (k, v) -> v);
-    graph.getInputStream("test-stream-2", (k, v) -> v);
-    graph.getInputStream("test-stream-3", (k, v) -> v);
+    graph.getInputStream("test-stream-1");
+    graph.getInputStream("test-stream-2");
+    graph.getInputStream("test-stream-3");
 
     List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
     Assert.assertEquals(inputSpecs.size(), 3);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..ee44cf9 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
@@ -389,8 +391,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
         .map(m -> m)
@@ -418,8 +421,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
@@ -444,8 +448,9 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
-          (k, m) -> new IntegerEnvelope((Integer) k));
+      MessageStream<IntegerEnvelope> inStream =
+          graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+              .map(kv -> new IntegerEnvelope(kv.getKey()));
       Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
 
       inStream

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..68b4ce0 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,9 +19,9 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -30,6 +30,10 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
@@ -43,8 +47,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -73,9 +75,8 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
-    OutputStream<Object, Object, Object> outputStream =
-        streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+    OutputStream<Object> outputStream = streamGraph.getOutputStream("output");
 
     inputStream
         .filter(mock(FilterFunction.class))
@@ -104,12 +105,49 @@ public class TestOperatorImplGraph {
   }
 
   @Test
+  public void testPartitionByChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
+    when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+        .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+    OutputStream<KV<Integer, String>> outputStream = streamGraph
+        .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+
+    inputStream
+        .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+        .sendTo(outputStream);
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(1, inputOpImpl.registeredOperators.size());
+
+    OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator
+    assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
+
+    InputOperatorImpl repartitionedInputOpImpl =
+        opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+    assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+
+    OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, sendToOpImpl.registeredOperators.size());
+    assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+  }
+
+  @Test
   public void testBroadcastChain() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
@@ -132,7 +170,7 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
-    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input");
     MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
     MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
     MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
@@ -156,8 +194,8 @@ public class TestOperatorImplGraph {
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
 
     JoinFunction mockJoinFunction = mock(JoinFunction.class);
-    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
     inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
 
     TaskContext mockTaskContext = mock(TaskContext.class);
@@ -182,13 +220,13 @@ public class TestOperatorImplGraph {
     // verify that left partial join operator calls getFirstKey
     Object mockLeftMessage = mock(Object.class);
     when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
-    inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
 
     // verify that right partial join operator calls getSecondKey
     Object mockRightMessage = mock(Object.class);
     when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
-    inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
     verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
 
     // verify that the join function apply is called with the correct messages on match
@@ -205,8 +243,8 @@ public class TestOperatorImplGraph {
     when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
-    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1");
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2");
 
     List<String> initializedOperators = new ArrayList<>();
     List<String> closedOperators = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index e183d87..a91c1af 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -41,7 +41,7 @@ public class TestStreamOperatorImpl {
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testSimpleOperator() {
+  public void testStreamOperator() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
@@ -61,7 +61,7 @@ public class TestStreamOperatorImpl {
   }
 
   @Test
-  public void testSimpleOperatorClose() {
+  public void testStreamOperatorClose() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
deleted file mode 100644
index eddfb0a..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-import org.junit.Assert._
-import org.junit.Test
-import java.nio.ByteBuffer
-
-class TestByteBufferSerde {
-  @Test
-  def testSerde {
-    val serde = new ByteBufferSerde
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-
-    val bytes = "A lazy way of creating a byte array".getBytes()
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    byteBuffer.mark()
-    assertArrayEquals(serde.toBytes(byteBuffer), bytes)
-    byteBuffer.reset()
-    assertEquals(serde.fromBytes(bytes), byteBuffer)
-  }
-
-  @Test
-  def testSerializationPreservesInput {
-    val serde = new ByteBufferSerde
-    val bytes = "A lazy way of creating a byte array".getBytes()
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    byteBuffer.get() // advance position by 1
-    serde.toBytes(byteBuffer)
-
-    assertEquals(byteBuffer.capacity(), byteBuffer.limit())
-    assertEquals(1, byteBuffer.position())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
deleted file mode 100644
index f605762..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestByteSerde {
-  @Test
-  def testByteSerde {
-    val serde = new ByteSerde
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-
-    val testBytes = "A lazy way of creating a byte array".getBytes()
-    assertArrayEquals(serde.toBytes(testBytes), testBytes)
-    assertArrayEquals(serde.fromBytes(testBytes), testBytes)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
deleted file mode 100644
index 60241b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDoubleSerde {
-  @Test
-  def testDoubleSerde {
-    val serde = new DoubleSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 9.156013e-002
-    val fooBarBytes = serde.toBytes(fooBar)
-    fooBarBytes.foreach(System.err.println)
-    assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
deleted file mode 100644
index a3e7e1f..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestIntegerSerde {
-  @Test
-  def testIntegerSerde {
-    val serde = new IntegerSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 37
-    val fooBarBytes = serde.toBytes(fooBar)
-    assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
deleted file mode 100644
index 77a7498..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestLongSerde {
-  @Test
-  def testLongSerde {
-    val serde = new LongSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = 1234123412341234L
-    val fooBarBytes = serde.toBytes(fooBar)
-    fooBarBytes.foreach(System.err.println)
-    assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
deleted file mode 100644
index 8e899d0..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSerializableSerde {
-  @Test
-  def testSerializableSerde {
-    val serde = new SerializableSerde[String]
-    assertNull(serde.toBytes(null))
-    assertNull(serde.fromBytes(null))
-    
-    val obj = "String is serializable"
-
-    // Serialized string is prefix + string itself
-    val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
-    val expected = (prefix ++ obj.getBytes("UTF-8"))
-    
-    val bytes = serde.toBytes(obj)
-
-    assertArrayEquals(expected, bytes)
-
-    val objRoundTrip:String = serde.fromBytes(bytes)
-    assertEquals(obj, objRoundTrip)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
deleted file mode 100644
index a1e8e88..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestStringSerde {
-  @Test
-  def testStringSerde {
-    val serde = new StringSerde("UTF-8")
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
-
-    val fooBar = "foo bar"
-    val fooBarBytes = serde.toBytes(fooBar)
-    assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
-    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
deleted file mode 100644
index 04ddcdb..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.BufferUnderflowException
-import java.util.UUID
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestUUIDSerde {
-  private val serde = new UUIDSerde
-
-  @Test
-  def testUUIDSerde {
-    val uuid = new UUID(13, 42)
-    val bytes = serde.toBytes(uuid)
-    assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
-    assertEquals(uuid, serde.fromBytes(bytes))
-  }
-
-  @Test
-  def testToBytesWhenNull {
-    assertEquals(null, serde.toBytes(null))
-  }
-
-  @Test
-  def testFromBytesWhenNull {
-    assertEquals(null, serde.fromBytes(null))
-  }
-
-  @Test(expected = classOf[BufferUnderflowException])
-  def testFromBytesWhenInvalid {
-    assertEquals(null, serde.fromBytes(Array[Byte](0)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 0f0d792..5824489 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -77,7 +77,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
    *         supplied serde name.
    */
   public String getSerdeClass(String name) {
-    return get(String.format(SerializerConfig.SERDE(), name), null);
+    return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null);
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index e442599..8436835 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -246,7 +246,7 @@ public class StreamAppender extends AppenderSkeleton {
       SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
-      String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
+      String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
       throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
           serdeKey + " property");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml
index 7b4fb82..ab74ebd 100644
--- a/samza-test/src/main/resources/log4j.xml
+++ b/samza-test/src/main/resources/log4j.xml
@@ -12,43 +12,39 @@
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
-    <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
-    <param name="MaxFileSize" value="256MB" />
-    <param name="MaxBackupIndex" value="20" />
-    <layout class="org.apache.log4j.PatternLayout">
-        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
-    </layout>
-  </appender>
-
-  <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
-    <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
-    <param name="MaxFileSize" value="256MB" />
-    <param name="MaxBackupIndex" value="1" />
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
-    </layout>
-  </appender>
-
   <appender name="console" class="org.apache.log4j.ConsoleAppender">
     <param name="Target" value="System.out" />
     <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+      <param name="ConversionPattern" value="[%t] %c{1} [%p] %m%n" />
     </layout>
   </appender>
 
   <root>
     <priority value="info" />
-    <appender-ref ref="console" /> 
-    <appender-ref ref="RollingAppender" />
+    <appender-ref ref="console" />
   </root>
 
   <logger name="STARTUP_LOGGER" additivity="false">
     <level value="info" />
-    <appender-ref ref="StartupAppender"/>
+    <appender-ref ref="console"/>
   </logger>
 
   <logger name="org.apache.hadoop">
-    <level value="off"/>
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.I0Itec.zkclient">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.zookeeper">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.samza.system.kafka">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="org.apache.kafka">
+    <level value="ERROR"/>
+  </logger>
+  <logger name="kafka">
+    <level value="ERROR"/>
   </logger>
-</log4j:configuration>
+</log4j:configuration>
\ No newline at end of file