You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/09/28 00:08:09 UTC
[2/4] samza git commit: SAMZA-1109;
Allow setting Serdes for fluent API operators in code
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 2c8f682..8c75bca 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
@@ -45,8 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -122,11 +121,11 @@ public class TestExecutionPlanner {
*
*/
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- BiFunction mockBuilder = mock(BiFunction.class);
- streamGraph.getInputStream("input1", mockBuilder)
- .partitionBy(m -> "yes!!!").map(m -> m)
+ MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ input1
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(kv -> kv)
.sendTo(output1);
return streamGraph;
}
@@ -145,13 +144,20 @@ public class TestExecutionPlanner {
*/
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction msgBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
@@ -162,21 +168,28 @@ public class TestExecutionPlanner {
private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction msgBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
m1.map(m -> m)
.filter(m->true)
- .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
m2.map(m -> m)
.filter(m->true)
- .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 4bda86b..095e407 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -19,25 +19,26 @@
package org.apache.samza.execution;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -105,13 +106,21 @@ public class TestJobGraphJsonGenerator {
StreamManager streamManager = new StreamManager(systemAdmins);
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction mockBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
m2.sink((message, collector, coordinator) -> { });
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
new file mode 100644
index 0000000..c59c0cc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestJobNode {
+
+ @Test
+ public void testAddSerdeConfigs() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+ StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
+ StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
+ doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+ doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+ doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+ MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+ OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
+ input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+
+ JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+ StreamEdge inputEdge = new StreamEdge(inputSpec);
+ StreamEdge outputEdge = new StreamEdge(outputSpec);
+ StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
+ jobNode.addInEdge(inputEdge);
+ jobNode.addOutEdge(outputEdge);
+ jobNode.addInEdge(repartitionEdge);
+ jobNode.addOutEdge(repartitionEdge);
+
+ Map<String, String> configs = new HashMap<>();
+ jobNode.addSerdeConfigs(configs);
+
+ MapConfig mapConfig = new MapConfig(configs);
+ Config serializers = mapConfig.subset("serializers.registry.", true);
+
+ // make sure that the serializers deserialize correctly
+ SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+ Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap(
+ e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+ e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+ ));
+ assertEquals(2, serializers.size());
+
+ String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+ String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+ assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+ assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+ String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+ assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+ assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+ String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+ assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+ assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..004c5cf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
@@ -276,10 +278,11 @@ public class TestJoinOperator {
@Override
public void init(StreamGraph graph, Config config) {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
MessageStream<FirstStreamIME> inStream =
- graph.getInputStream("instream", FirstStreamIME::new);
+ graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new);
MessageStream<SecondStreamIME> inStream2 =
- graph.getInputStream("instream2", SecondStreamIME::new);
+ graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new);
SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
@@ -330,14 +333,24 @@ public class TestJoinOperator {
}
private static class FirstStreamIME extends IncomingMessageEnvelope {
+ FirstStreamIME(KV<Integer, Integer> message) {
+ super(new SystemStreamPartition(
+ "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue());
+ }
+
FirstStreamIME(Integer key, Integer message) {
- super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, message);
+ this(KV.of(key, message));
}
}
private static class SecondStreamIME extends IncomingMessageEnvelope {
+ SecondStreamIME(KV<Integer, Integer> message) {
+ super(new SystemStreamPartition(
+ "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue());
+ }
+
SecondStreamIME(Integer key, Integer message) {
- super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, message);
+ this(KV.of(key, message));
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 61224f2..c6554bc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
@@ -39,20 +40,19 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.KVSerde;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -172,9 +172,8 @@ public class TestMessageStreamImpl {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-
- OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
- inputStream.sendTo(mockOutputOpSpec);
+ OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ inputStream.sendTo(mockOutputStreamImpl);
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -182,33 +181,75 @@ public class TestMessageStreamImpl {
assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
- assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockOutputStreamImpl, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+
+ // same behavior as above so nothing new to assert. but ensures that this variant compiles.
+ MessageStreamImpl<KV<String, TestMessageEnvelope>> keyedInputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ OutputStreamImpl<KV<String, TestMessageEnvelope>> mockKeyedOutputStreamImpl = mock(OutputStreamImpl.class);
+ keyedInputStream.sendTo(mockKeyedOutputStreamImpl);
+
+ // can't unit test it, but the following variants should not compile
+// inputStream.sendTo(mockKeyedOutputStreamImpl);
+// keyedInputStream.sendTo(mockOutputStreamImpl);
}
@Test
- public void testPartitionBy() {
+ public void testRepartition() {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
- Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
- OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ KVSerde mockKVSerde = mock(KVSerde.class);
IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
- when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde)))
.thenReturn(mockIntermediateStream);
when(mockIntermediateStream.getOutputStream())
- .thenReturn(mockOutputOpSpec);
+ .thenReturn(mockOutputStreamImpl);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
- inputStream.partitionBy(mockKeyFn);
+ Function mockKeyFunction = mock(Function.class);
+ Function mockValueFunction = mock(Function.class);
+ inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde);
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
- assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
+ assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+ assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
+ }
+
+ @Test
+ public void testRepartitionWithoutSerde() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+ String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+ OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(null)))
+ .thenReturn(mockIntermediateStream);
+ when(mockIntermediateStream.getOutputStream())
+ .thenReturn(mockOutputStreamImpl);
+
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ Function mockKeyFunction = mock(Function.class);
+ Function mockValueFunction = mock(Function.class);
+ inputStream.partitionBy(mockKeyFunction, mockValueFunction);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
- assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+ assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 1fc60bd..45583c2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -27,36 +27,136 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestStreamGraphImpl {
@Test
- public void testGetInputStream() {
+ public void testGetInputStreamWithValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+ Serde mockValueSerde = mock(Serde.class);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetInputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ graph.getInputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ graph.setDefaultSerde(mockValueSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultDefaultSerde() {
+ // default default serde == user hasn't provided a default serde
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
}
@Test
@@ -65,15 +165,13 @@ public class TestStreamGraphImpl {
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
}
@@ -86,12 +184,12 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
- MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+ MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
+ MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
(InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
(InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
assertEquals(graph.getInputOperators().size(), 2);
@@ -105,29 +203,149 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getInputStream("test-stream-1", mock(BiFunction.class));
- graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+ graph.getInputStream("test-stream-1");
+ // should throw exception
+ graph.getInputStream("test-stream-1");
+ }
+
+ @Test
+ public void testGetOutputStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ OutputStream<TestMessageEnvelope> outputStream =
+ graph.getOutputStream("test-stream-1", mockValueSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetOutputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ graph.getOutputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ Serde mockValueSerde = mock(Serde.class);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.setDefaultSerde(mockValueSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
@Test
- public void testGetOutputStream() {
+ public void testGetOutputStreamWithDefaultDefaultSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getInputStream("test-stream-1");
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingOutputStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
- Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+ graph.getOutputStream("test-stream-1");
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
- OutputStream<String, String, TestMessageEnvelope> outputStream =
- graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingIntermediateStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
- assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
- assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
- assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getIntermediateStream("test-stream-1", null);
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
}
@Test(expected = IllegalStateException.class)
@@ -136,12 +354,89 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
- graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
+ graph.getOutputStream("test-stream-1");
+ graph.getOutputStream("test-stream-1"); // should throw exception
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockValueSerde);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockKVSerde);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ graph.setDefaultSerde(mockValueSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
@Test
- public void testGetIntermediateStream() {
+ public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
@@ -150,19 +445,45 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
- Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
- assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
- assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
}
@Test(expected = IllegalStateException.class)
@@ -171,8 +492,8 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
- graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+ graph.getIntermediateStream("test-stream-1", mock(Serde.class));
+ graph.getIntermediateStream("test-stream-1", mock(Serde.class));
}
@Test
@@ -199,9 +520,9 @@ public class TestStreamGraphImpl {
StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
- graph.getInputStream("test-stream-1", (k, v) -> v);
- graph.getInputStream("test-stream-2", (k, v) -> v);
- graph.getInputStream("test-stream-3", (k, v) -> v);
+ graph.getInputStream("test-stream-1");
+ graph.getInputStream("test-stream-2");
+ graph.getInputStream("test-stream-3");
List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
Assert.assertEquals(inputSpecs.size(), 3);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..ee44cf9 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
@@ -389,8 +391,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
.map(m -> m)
@@ -418,8 +421,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
.map(m -> m)
@@ -444,8 +448,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..68b4ce0 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,9 +19,9 @@
package org.apache.samza.operators.impl;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
@@ -30,6 +30,10 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
@@ -43,8 +47,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -73,9 +75,8 @@ public class TestOperatorImplGraph {
when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
- OutputStream<Object, Object, Object> outputStream =
- streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ OutputStream<Object> outputStream = streamGraph.getOutputStream("output");
inputStream
.filter(mock(FilterFunction.class))
@@ -104,12 +105,49 @@ public class TestOperatorImplGraph {
}
@Test
+ public void testPartitionByChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
+ when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+ .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ OutputStream<KV<Integer, String>> outputStream = streamGraph
+ .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+
+ inputStream
+ .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+ .sendTo(outputStream);
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(1, inputOpImpl.registeredOperators.size());
+
+ OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator
+ assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
+
+ InputOperatorImpl repartitionedInputOpImpl =
+ opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+ assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+
+ OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, sendToOpImpl.registeredOperators.size());
+ assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+ }
+
+ @Test
public void testBroadcastChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
inputStream.filter(mock(FilterFunction.class));
inputStream.map(mock(MapFunction.class));
@@ -132,7 +170,7 @@ public class TestOperatorImplGraph {
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
@@ -156,8 +194,8 @@ public class TestOperatorImplGraph {
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
JoinFunction mockJoinFunction = mock(JoinFunction.class);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
TaskContext mockTaskContext = mock(TaskContext.class);
@@ -182,13 +220,13 @@ public class TestOperatorImplGraph {
// verify that left partial join operator calls getFirstKey
Object mockLeftMessage = mock(Object.class);
when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
- inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
// verify that right partial join operator calls getSecondKey
Object mockRightMessage = mock(Object.class);
when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
- inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
// verify that the join function apply is called with the correct messages on match
@@ -205,8 +243,8 @@ public class TestOperatorImplGraph {
when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1");
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2");
List<String> initializedOperators = new ArrayList<>();
List<String> closedOperators = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index e183d87..a91c1af 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -41,7 +41,7 @@ public class TestStreamOperatorImpl {
@Test
@SuppressWarnings("unchecked")
- public void testSimpleOperator() {
+ public void testStreamOperator() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
@@ -61,7 +61,7 @@ public class TestStreamOperatorImpl {
}
@Test
- public void testSimpleOperatorClose() {
+ public void testStreamOperatorClose() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
deleted file mode 100644
index eddfb0a..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-import org.junit.Assert._
-import org.junit.Test
-import java.nio.ByteBuffer
-
-class TestByteBufferSerde {
- @Test
- def testSerde {
- val serde = new ByteBufferSerde
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val bytes = "A lazy way of creating a byte array".getBytes()
- val byteBuffer = ByteBuffer.wrap(bytes)
- byteBuffer.mark()
- assertArrayEquals(serde.toBytes(byteBuffer), bytes)
- byteBuffer.reset()
- assertEquals(serde.fromBytes(bytes), byteBuffer)
- }
-
- @Test
- def testSerializationPreservesInput {
- val serde = new ByteBufferSerde
- val bytes = "A lazy way of creating a byte array".getBytes()
- val byteBuffer = ByteBuffer.wrap(bytes)
- byteBuffer.get() // advance position by 1
- serde.toBytes(byteBuffer)
-
- assertEquals(byteBuffer.capacity(), byteBuffer.limit())
- assertEquals(1, byteBuffer.position())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
deleted file mode 100644
index f605762..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestByteSerde {
- @Test
- def testByteSerde {
- val serde = new ByteSerde
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val testBytes = "A lazy way of creating a byte array".getBytes()
- assertArrayEquals(serde.toBytes(testBytes), testBytes)
- assertArrayEquals(serde.fromBytes(testBytes), testBytes)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
deleted file mode 100644
index 60241b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDoubleSerde {
- @Test
- def testDoubleSerde {
- val serde = new DoubleSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 9.156013e-002
- val fooBarBytes = serde.toBytes(fooBar)
- fooBarBytes.foreach(System.err.println)
- assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
deleted file mode 100644
index a3e7e1f..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestIntegerSerde {
- @Test
- def testIntegerSerde {
- val serde = new IntegerSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 37
- val fooBarBytes = serde.toBytes(fooBar)
- assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
deleted file mode 100644
index 77a7498..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestLongSerde {
- @Test
- def testLongSerde {
- val serde = new LongSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 1234123412341234L
- val fooBarBytes = serde.toBytes(fooBar)
- fooBarBytes.foreach(System.err.println)
- assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
deleted file mode 100644
index 8e899d0..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSerializableSerde {
- @Test
- def testSerializableSerde {
- val serde = new SerializableSerde[String]
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val obj = "String is serializable"
-
- // Serialized string is prefix + string itself
- val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
- val expected = (prefix ++ obj.getBytes("UTF-8"))
-
- val bytes = serde.toBytes(obj)
-
- assertArrayEquals(expected, bytes)
-
- val objRoundTrip:String = serde.fromBytes(bytes)
- assertEquals(obj, objRoundTrip)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
deleted file mode 100644
index a1e8e88..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestStringSerde {
- @Test
- def testStringSerde {
- val serde = new StringSerde("UTF-8")
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = "foo bar"
- val fooBarBytes = serde.toBytes(fooBar)
- assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
deleted file mode 100644
index 04ddcdb..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.BufferUnderflowException
-import java.util.UUID
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestUUIDSerde {
- private val serde = new UUIDSerde
-
- @Test
- def testUUIDSerde {
- val uuid = new UUID(13, 42)
- val bytes = serde.toBytes(uuid)
- assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
- assertEquals(uuid, serde.fromBytes(bytes))
- }
-
- @Test
- def testToBytesWhenNull {
- assertEquals(null, serde.toBytes(null))
- }
-
- @Test
- def testFromBytesWhenNull {
- assertEquals(null, serde.fromBytes(null))
- }
-
- @Test(expected = classOf[BufferUnderflowException])
- def testFromBytesWhenInvalid {
- assertEquals(null, serde.fromBytes(Array[Byte](0)))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 0f0d792..5824489 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -77,7 +77,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
* supplied serde name.
*/
public String getSerdeClass(String name) {
- return get(String.format(SerializerConfig.SERDE(), name), null);
+ return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null);
}
public String getStreamSerdeName(String systemName, String streamName) {
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index e442599..8436835 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -246,7 +246,7 @@ public class StreamAppender extends AppenderSkeleton {
SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
serde = serdeFactory.getSerde(systemName, config);
} else {
- String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
+ String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
serdeKey + " property");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml
index 7b4fb82..ab74ebd 100644
--- a/samza-test/src/main/resources/log4j.xml
+++ b/samza-test/src/main/resources/log4j.xml
@@ -12,43 +12,39 @@
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
- <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
- <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
- <param name="MaxFileSize" value="256MB" />
- <param name="MaxBackupIndex" value="20" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
- </layout>
- </appender>
-
- <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
- <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
- <param name="MaxFileSize" value="256MB" />
- <param name="MaxBackupIndex" value="1" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
- </layout>
- </appender>
-
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ <param name="ConversionPattern" value="[%t] %c{1} [%p] %m%n" />
</layout>
</appender>
<root>
<priority value="info" />
- <appender-ref ref="console" />
- <appender-ref ref="RollingAppender" />
+ <appender-ref ref="console" />
</root>
<logger name="STARTUP_LOGGER" additivity="false">
<level value="info" />
- <appender-ref ref="StartupAppender"/>
+ <appender-ref ref="console"/>
</logger>
<logger name="org.apache.hadoop">
- <level value="off"/>
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.I0Itec.zkclient">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.zookeeper">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.samza.system.kafka">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.kafka">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="kafka">
+ <level value="ERROR"/>
</logger>
-</log4j:configuration>
+</log4j:configuration>
\ No newline at end of file