You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:50 UTC
[07/10] samza git commit: SAMZA-1659: Serializable OperatorSpec
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index dac4e94..602b595 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -21,9 +21,8 @@ package org.apache.samza.operators;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
@@ -44,18 +43,24 @@ import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.TestClock;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestJoinOperator {
@@ -64,10 +69,22 @@ public class TestJoinOperator {
private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ private Config config;
+
+ @Before
+ public void setUp() {
+ Map<String, String> mapConfig = new HashMap<>();
+ mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ mapConfig.put("job.default.system", "insystem");
+ mapConfig.put("job.name", "jobName");
+ mapConfig.put("job.id", "jobId");
+ config = new MapConfig(mapConfig);
+ }
+
@Test
public void join() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -82,43 +99,42 @@ public class TestJoinOperator {
@Test(expected = SamzaException.class)
public void joinWithSelfThrowsException() throws Exception {
- StreamApplication app = new StreamApplication() {
- @Override
- public void init(StreamGraph graph, Config config) {
- IntegerSerde integerSerde = new IntegerSerde();
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
- MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde);
-
- inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
- }
- };
-
- createStreamOperatorTask(new SystemClock(), app); // should throw an exception
+ config.put("streams.instream.system", "insystem");
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), config);
+ IntegerSerde integerSerde = new IntegerSerde();
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+ MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde);
+
+ inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
+
+ createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an exception
}
@Test
public void joinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
- TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn);
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
- assertEquals(1, joinFn.getNumInitCalls());
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn);
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+
MessageCollector messageCollector = mock(MessageCollector.class);
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// close should not be called till now
- assertEquals(0, joinFn.getNumCloseCalls());
sot.close();
- // close should be called from sot.close()
- assertEquals(1, joinFn.getNumCloseCalls());
+ verify(messageCollector, times(0)).send(any(OutgoingMessageEnvelope.class));
+ // Make sure the joinFn has been copied instead of directly referred by the task instance
+ assertEquals(0, joinFn.getNumInitCalls());
+ assertEquals(0, joinFn.getNumCloseCalls());
}
@Test
public void joinReverse() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -133,8 +149,8 @@ public class TestJoinOperator {
@Test
public void joinNoMatch() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -148,8 +164,8 @@ public class TestJoinOperator {
@Test
public void joinNoMatchReverse() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -163,8 +179,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKey() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -181,8 +197,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKeyReverse() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -199,8 +215,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessages() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -222,8 +238,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessagesReverse() throws Exception {
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -246,8 +262,8 @@ public class TestJoinOperator {
@Test
public void joinRemovesExpiredMessages() throws Exception {
TestClock testClock = new TestClock();
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -266,8 +282,8 @@ public class TestJoinOperator {
@Test
public void joinRemovesExpiredMessagesReverse() throws Exception {
TestClock testClock = new TestClock();
- TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
+ StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -283,15 +299,12 @@ public class TestJoinOperator {
assertTrue(output.isEmpty());
}
- private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplication app) throws Exception {
- ApplicationRunner runner = mock(ApplicationRunner.class);
- when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
- when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
+ private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamGraphSpec graphSpec) throws Exception {
TaskContextImpl taskContext = mock(TaskContextImpl.class);
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
- new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
+ new SystemStreamPartition("insystem", "instream2", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
// need to return different stores for left and right side
IntegerSerde integerSerde = new IntegerSerde();
@@ -301,35 +314,30 @@ public class TestJoinOperator {
when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
.thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
- Config config = mock(Config.class);
- when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
- when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-
- StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
+ StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager(), clock);
sot.init(config, taskContext);
return sot;
}
- private static class TestJoinStreamApplication implements StreamApplication {
-
- private final TestJoinFunction joinFn;
-
- TestJoinStreamApplication(TestJoinFunction joinFn) {
- this.joinFn = joinFn;
- }
+ private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException {
+ ApplicationRunner runner = mock(ApplicationRunner.class);
+ when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
+ when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem"));
- @Override
- public void init(StreamGraph graph, Config config) {
- IntegerSerde integerSerde = new IntegerSerde();
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
- MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde);
- MessageStream<KV<Integer, Integer>> inStream2 = graph.getInputStream("instream2", kvSerde);
-
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- inStream
- .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
- .sink((m, mc, tc) -> mc.send(new OutgoingMessageEnvelope(outputSystemStream, m)));
- }
+ StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+ IntegerSerde integerSerde = new IntegerSerde();
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+ MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde);
+ MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream("instream2", kvSerde);
+
+ inStream
+ .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+
+ return graphSpec;
}
private static class TestJoinFunction
@@ -380,7 +388,7 @@ public class TestJoinOperator {
private static class SecondStreamIME extends IncomingMessageEnvelope {
SecondStreamIME(Integer key, Integer value) {
- super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, value);
+ super(new SystemStreamPartition("insystem", "instream2", new Partition(0)), "1", key, value);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 96e234e..fff85e8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,11 +18,11 @@
*/
package org.apache.samza.operators;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
-import java.util.function.Function;
-import java.util.function.Supplier;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
@@ -32,6 +32,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -54,8 +55,6 @@ import org.apache.samza.table.TableSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import com.google.common.collect.ImmutableList;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -71,7 +70,7 @@ public class TestMessageStreamImpl {
@Test
public void testMap() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -96,7 +95,7 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMap() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -113,7 +112,7 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMapWithRelaxedTypes() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -133,7 +132,7 @@ public class TestMessageStreamImpl {
@Test
public void testFilter() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -158,7 +157,7 @@ public class TestMessageStreamImpl {
@Test
public void testSink() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -175,7 +174,7 @@ public class TestMessageStreamImpl {
@Test
public void testSendTo() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
@@ -200,8 +199,8 @@ public class TestMessageStreamImpl {
}
@Test
- public void testRepartition() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ public void testPartitionBy() throws IOException {
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String mockOpName = "mockName";
when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName);
@@ -215,8 +214,8 @@ public class TestMessageStreamImpl {
when(mockIntermediateStream.isKeyed()).thenReturn(true);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
- Function mockKeyFunction = mock(Function.class);
- Function mockValueFunction = mock(Function.class);
+ MapFunction mockKeyFunction = mock(MapFunction.class);
+ MapFunction mockValueFunction = mock(MapFunction.class);
inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde, "p1");
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
@@ -232,7 +231,7 @@ public class TestMessageStreamImpl {
@Test
public void testRepartitionWithoutSerde() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String mockOpName = "mockName";
when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName);
@@ -245,8 +244,8 @@ public class TestMessageStreamImpl {
when(mockIntermediateStream.isKeyed()).thenReturn(true);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
- Function mockKeyFunction = mock(Function.class);
- Function mockValueFunction = mock(Function.class);
+ MapFunction mockKeyFunction = mock(MapFunction.class);
+ MapFunction mockValueFunction = mock(MapFunction.class);
inputStream.partitionBy(mockKeyFunction, mockValueFunction, "p1");
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
@@ -262,18 +261,17 @@ public class TestMessageStreamImpl {
@Test
public void testWindowWithRelaxedTypes() throws Exception {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
- Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+ MapFunction<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
- Supplier<Integer> initialValue = () -> 0;
+ SupplierFunction<Integer> initialValue = () -> 0;
// should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
- Window<TestInputMessageEnvelope, String, Integer> window =
- Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator,
- null, mock(Serde.class));
+ Window<TestInputMessageEnvelope, String, Integer> window = Windows
+ .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, null, mock(Serde.class));
MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window, "w1");
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
@@ -287,7 +285,7 @@ public class TestMessageStreamImpl {
@Test
public void testJoin() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -319,7 +317,7 @@ public class TestMessageStreamImpl {
@Test
public void testSendToTable() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec inputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec);
@@ -336,13 +334,12 @@ public class TestMessageStreamImpl {
SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec;
assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
- assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec());
assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
}
@Test
public void testStreamTableJoin() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -370,7 +367,7 @@ public class TestMessageStreamImpl {
@Test
public void testMerge() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
@@ -410,7 +407,7 @@ public class TestMessageStreamImpl {
@Test
public void testMergeWithRelaxedTypes() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
// other streams have the same message type T as input stream message type M
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
new file mode 100644
index 0000000..2be88ca
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for THE
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecTestUtils;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit tests for {@link OperatorSpecGraph}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OperatorSpec.class)
+public class TestOperatorSpecGraph {
+
+ private StreamGraphSpec mockGraph;
+ private Map<StreamSpec, InputOperatorSpec> inputOpSpecMap;
+ private Map<StreamSpec, OutputStreamImpl> outputStrmMap;
+ private Set<OperatorSpec> allOpSpecs;
+
+ @Before
+ public void setUp() {
+ this.mockGraph = mock(StreamGraphSpec.class);
+
+ /**
+ * Setup two linear transformation pipelines:
+ * 1) input1 --> filter --> sendTo
+ * 2) input2 --> map --> sink
+ */
+ StreamSpec testInputSpec = new StreamSpec("test-input-1", "test-input-1", "kafka");
+ InputOperatorSpec testInput = new InputOperatorSpec(testInputSpec, new NoOpSerde(), new NoOpSerde(), true, "test-input-1");
+ StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> true, "test-filter-2");
+ StreamSpec testOutputSpec = new StreamSpec("test-output-1", "test-output-1", "kafka");
+ OutputStreamImpl outputStream1 = new OutputStreamImpl(testOutputSpec, null, null, true);
+ OutputOperatorSpec outputSpec = OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
+ testInput.registerNextOperatorSpec(filterOp);
+ filterOp.registerNextOperatorSpec(outputSpec);
+ StreamSpec testInputSpec2 = new StreamSpec("test-input-2", "test-input-2", "kafka");
+ InputOperatorSpec testInput2 = new InputOperatorSpec(testInputSpec2, new NoOpSerde(), new NoOpSerde(), true, "test-input-4");
+ StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, "test-map-5");
+ SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, tc) -> { }, "test-sink-6");
+ testInput2.registerNextOperatorSpec(testMap);
+ testMap.registerNextOperatorSpec(testSink);
+
+ this.inputOpSpecMap = new LinkedHashMap<>();
+ inputOpSpecMap.put(testInputSpec, testInput);
+ inputOpSpecMap.put(testInputSpec2, testInput2);
+ this.outputStrmMap = new LinkedHashMap<>();
+ outputStrmMap.put(testOutputSpec, outputStream1);
+ when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
+ when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
+ this.allOpSpecs = new HashSet<OperatorSpec>() { {
+ this.add(testInput);
+ this.add(filterOp);
+ this.add(outputSpec);
+ this.add(testInput2);
+ this.add(testMap);
+ this.add(testSink);
+ } };
+ }
+
+ @After
+ public void tearDown() {
+ this.mockGraph = null;
+ this.inputOpSpecMap = null;
+ this.outputStrmMap = null;
+ this.allOpSpecs = null;
+ }
+
+ @Test
+ public void testConstructor() {
+ OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph);
+ assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
+ assertEquals(specGraph.getOutputStreams(), outputStrmMap);
+ assertTrue(specGraph.getTables().isEmpty());
+ assertTrue(!specGraph.hasWindowOrJoins());
+ assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs);
+ }
+
+ @Test
+ public void testClone() {
+ OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+ OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone();
+ OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, clonedSpecGraph);
+ }
+
+ @Test(expected = NotSerializableException.class)
+ public void testCloneWithSerializationError() throws Throwable {
+ OperatorSpec mockFailedOpSpec = PowerMockito.mock(OperatorSpec.class);
+ when(mockFailedOpSpec.getOpId()).thenReturn("test-failed-op-4");
+ allOpSpecs.add(mockFailedOpSpec);
+ inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(mockFailedOpSpec);
+
+ //failed with serialization error
+ try {
+ new OperatorSpecGraph(mockGraph);
+ fail("Should have failed with serialization error");
+ } catch (SamzaException se) {
+ throw se.getCause();
+ }
+ }
+
+ @Test(expected = IOException.class)
+ public void testCloneWithDeserializationError() throws Throwable {
+ TestDeserializeOperatorSpec testOp = new TestDeserializeOperatorSpec(OperatorSpec.OpCode.MAP, "test-failed-op-4");
+ this.allOpSpecs.add(testOp);
+ inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp);
+
+ OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+ //failed with serialization error
+ try {
+ operatorSpecGraph.clone();
+ fail("Should have failed with serialization error");
+ } catch (SamzaException se) {
+ throw se.getCause();
+ }
+ }
+
+ private static class TestDeserializeOperatorSpec extends OperatorSpec {
+
+ public TestDeserializeOperatorSpec(OpCode opCode, String opId) {
+ super(opCode, opId);
+ }
+
+ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+ throw new IOException("Raise IOException to cause deserialization failure");
+ }
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return null;
+ }
+
+ @Override
+ public TimerFunction getTimerFn() {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
deleted file mode 100644
index 3bb44b5..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for THE
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamGraphImpl {
-
- @Test
- public void testGetInputStreamWithValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test
- public void testGetInputStreamWithKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test(expected = NullPointerException.class)
- public void testGetInputStreamWithNullSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- graph.getInputStream("test-stream-1", null);
- }
-
- @Test
- public void testGetInputStreamWithDefaultValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- graph.setDefaultSerde(mockValueSerde);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test
- public void testGetInputStreamWithDefaultKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- graph.setDefaultSerde(mockKVSerde);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test
- public void testGetInputStreamWithDefaultDefaultSerde() {
- // default default serde == user hasn't provided a default serde
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
- assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
- }
-
- @Test
- public void testGetInputStreamWithRelaxedTypes() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
- (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
- }
-
- @Test
- public void testMultipleGetInputStreams() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
- StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
- MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
-
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
- (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
- InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
- (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
-
- assertEquals(graph.getInputOperators().size(), 2);
- assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
- assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameInputStreamTwice() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getInputStream("test-stream-1");
- // should throw exception
- graph.getInputStream("test-stream-1");
- }
-
- @Test
- public void testGetOutputStreamWithValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- OutputStream<TestMessageEnvelope> outputStream =
- graph.getOutputStream("test-stream-1", mockValueSerde);
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
- assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
- assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test
- public void testGetOutputStreamWithKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- graph.setDefaultSerde(mockKVSerde);
- OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
- assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
- assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test(expected = NullPointerException.class)
- public void testGetOutputStreamWithNullSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- graph.getOutputStream("test-stream-1", null);
- }
-
- @Test
- public void testGetOutputStreamWithDefaultValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- Serde mockValueSerde = mock(Serde.class);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.setDefaultSerde(mockValueSerde);
- OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
- assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
- assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test
- public void testGetOutputStreamWithDefaultKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- graph.setDefaultSerde(mockKVSerde);
-
- OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
- assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
- assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test
- public void testGetOutputStreamWithDefaultDefaultSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
-
- OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
- assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
- assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
- assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSerdeAfterGettingStreams() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getInputStream("test-stream-1");
- graph.setDefaultSerde(mock(Serde.class)); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSerdeAfterGettingOutputStream() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getOutputStream("test-stream-1");
- graph.setDefaultSerde(mock(Serde.class)); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSerdeAfterGettingIntermediateStream() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getIntermediateStream("test-stream-1", null);
- graph.setDefaultSerde(mock(Serde.class)); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameOutputStreamTwice() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getOutputStream("test-stream-1");
- graph.getOutputStream("test-stream-1"); // should throw exception
- }
-
- @Test
- public void testGetIntermediateStreamWithValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- String mockStreamName = "mockStreamName";
- when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- Serde mockValueSerde = mock(Serde.class);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream(mockStreamName, mockValueSerde);
-
- assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
- assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- String mockStreamName = "mockStreamName";
- when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream(mockStreamName, mockKVSerde);
-
- assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
- assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
- assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithDefaultValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- String mockStreamName = "mockStreamName";
- when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- Serde mockValueSerde = mock(Serde.class);
- graph.setDefaultSerde(mockValueSerde);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream(mockStreamName, null);
-
- assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
- assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- String mockStreamName = "mockStreamName";
- when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- graph.setDefaultSerde(mockKVSerde);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream(mockStreamName, null);
-
- assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
- assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
- assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithDefaultDefaultSerde() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec mockStreamSpec = mock(StreamSpec.class);
- String mockStreamName = "mockStreamName";
- when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream(mockStreamName, null);
-
- assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
- assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
- assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
- assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
- assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameIntermediateStreamTwice() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getIntermediateStream("test-stream-1", mock(Serde.class));
- graph.getIntermediateStream("test-stream-1", mock(Serde.class));
- }
-
- @Test
- public void testGetNextOpIdIncrementsId() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, null));
- assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
- assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetNextOpIdRejectsDuplicates() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
- graph.getNextOpId(OpCode.JOIN, "customName"); // should throw
- }
-
- @Test
- public void testUserDefinedIdValidation() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- // null and empty userDefinedIDs should fall back to autogenerated IDs.
- try {
- graph.getNextOpId(OpCode.FILTER, null);
- graph.getNextOpId(OpCode.FILTER, "");
- graph.getNextOpId(OpCode.FILTER, " ");
- graph.getNextOpId(OpCode.FILTER, "\t");
- } catch (SamzaException e) {
- Assert.fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
- }
-
- List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID");
- for (String validOpId: validOpIds) {
- try {
- graph.getNextOpId(OpCode.FILTER, validOpId);
- } catch (Exception e) {
- Assert.fail("Received an exception with a valid operator ID: " + validOpId);
- }
- }
-
- List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
- for (String invalidOpId: invalidOpIds) {
- try {
- graph.getNextOpId(OpCode.FILTER, invalidOpId);
- Assert.fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
- } catch (SamzaException e) { }
- }
- }
-
- @Test
- public void testGetInputStreamPreservesInsertionOrder() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-
- StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
-
- StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
- when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
-
- graph.getInputStream("test-stream-1");
- graph.getInputStream("test-stream-2");
- graph.getInputStream("test-stream-3");
-
- List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
- Assert.assertEquals(inputSpecs.size(), 3);
- Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
- Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
- Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
- }
-
- @Test
- public void testGetTable() {
- ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- when(mockTableDescriptor.getTableSpec()).thenReturn(
- new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>()));
- Assert.assertNotNull(graph.getTable(mockTableDescriptor));
- }
-}