You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:11 UTC
[5/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and
ApplicationRunner for high- and low-level APIs in YARN and standalone
environment
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 0759aba..6fa9ed1 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskContextImpl;
@@ -72,8 +73,8 @@ public class TestJoinOperator {
@Test
public void join() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -93,25 +94,26 @@ public class TestJoinOperator {
mapConfig.put("job.id", "jobId");
StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream");
Config config = new MapConfig(mapConfig);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- IntegerSerde integerSerde = new IntegerSerde();
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("inStream", kvSerde);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ IntegerSerde integerSerde = new IntegerSerde();
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("inStream", kvSerde);
- MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor);
+ MessageStream<KV<Integer, Integer>> inStream = appDesc.getInputStream(inputDescriptor);
- inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
+ inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join");
+ }, config);
- createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an exception
+ createStreamOperatorTask(new SystemClock(), streamAppDesc); // should throw an exception
}
@Test
public void joinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn);
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(joinFn);
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
MessageCollector messageCollector = mock(MessageCollector.class);
@@ -129,8 +131,8 @@ public class TestJoinOperator {
@Test
public void joinReverse() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -145,8 +147,8 @@ public class TestJoinOperator {
@Test
public void joinNoMatch() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -160,8 +162,8 @@ public class TestJoinOperator {
@Test
public void joinNoMatchReverse() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -175,8 +177,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKey() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -193,8 +195,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKeyReverse() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -211,8 +213,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessages() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -234,8 +236,8 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessagesReverse() throws Exception {
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -258,8 +260,8 @@ public class TestJoinOperator {
@Test
public void joinRemovesExpiredMessages() throws Exception {
TestClock testClock = new TestClock();
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(testClock, streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -278,8 +280,8 @@ public class TestJoinOperator {
@Test
public void joinRemovesExpiredMessagesReverse() throws Exception {
TestClock testClock = new TestClock();
- StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
+ StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction());
+ StreamOperatorTask sot = createStreamOperatorTask(testClock, streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
@@ -295,7 +297,8 @@ public class TestJoinOperator {
assertTrue(output.isEmpty());
}
- private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamGraphSpec graphSpec) throws Exception {
+ private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplicationDescriptorImpl graphSpec)
+ throws Exception {
Map<String, String> mapConfig = new HashMap<>();
mapConfig.put("job.name", "jobName");
mapConfig.put("job.id", "jobId");
@@ -320,31 +323,31 @@ public class TestJoinOperator {
return sot;
}
- private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException {
+ private StreamApplicationDescriptorImpl getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException {
Map<String, String> mapConfig = new HashMap<>();
mapConfig.put("job.name", "jobName");
mapConfig.put("job.id", "jobId");
StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream");
StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2");
Config config = new MapConfig(mapConfig);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- IntegerSerde integerSerde = new IntegerSerde();
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = sd.getInputDescriptor("inStream", kvSerde);
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = sd.getInputDescriptor("inStream2", kvSerde);
-
- MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor1);
- MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream(inputDescriptor2);
-
- inStream
- .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
- .sink((message, messageCollector, taskCoordinator) -> {
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
-
- return graphSpec;
+
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ IntegerSerde integerSerde = new IntegerSerde();
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = sd.getInputDescriptor("inStream", kvSerde);
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = sd.getInputDescriptor("inStream2", kvSerde);
+
+ MessageStream<KV<Integer, Integer>> inStream = appDesc.getInputStream(inputDescriptor1);
+ MessageStream<KV<Integer, Integer>> inStream2 = appDesc.getInputStream(inputDescriptor2);
+
+ inStream
+ .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ }, config);
}
private static class TestJoinFunction
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 001ffda..566079b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -24,6 +24,7 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
@@ -70,7 +71,7 @@ public class TestMessageStreamImpl {
@Test
public void testMap() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -95,7 +96,7 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMap() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -112,7 +113,7 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMapWithRelaxedTypes() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -132,7 +133,7 @@ public class TestMessageStreamImpl {
@Test
public void testFilter() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -157,7 +158,7 @@ public class TestMessageStreamImpl {
@Test
public void testSink() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -174,7 +175,7 @@ public class TestMessageStreamImpl {
@Test
public void testSendTo() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
@@ -200,7 +201,7 @@ public class TestMessageStreamImpl {
@Test
public void testPartitionBy() throws IOException {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String mockOpName = "mockName";
when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName);
@@ -231,7 +232,7 @@ public class TestMessageStreamImpl {
@Test
public void testRepartitionWithoutSerde() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String mockOpName = "mockName";
when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName);
@@ -261,7 +262,7 @@ public class TestMessageStreamImpl {
@Test
public void testWindowWithRelaxedTypes() throws Exception {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
@@ -285,7 +286,7 @@ public class TestMessageStreamImpl {
@Test
public void testJoin() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -317,7 +318,7 @@ public class TestMessageStreamImpl {
@Test
public void testSendToTable() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec inputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec);
@@ -339,7 +340,7 @@ public class TestMessageStreamImpl {
@Test
public void testStreamTableJoin() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -367,7 +368,7 @@ public class TestMessageStreamImpl {
@Test
public void testMerge() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
@@ -407,7 +408,7 @@ public class TestMessageStreamImpl {
@Test
public void testMergeWithRelaxedTypes() {
- StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+ StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
// other streams have the same message type T as input stream message type M
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 6469326..a5b15b8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.TimerFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -57,14 +58,14 @@ import static org.mockito.Mockito.*;
@PrepareForTest(OperatorSpec.class)
public class TestOperatorSpecGraph {
- private StreamGraphSpec mockGraph;
+ private StreamApplicationDescriptorImpl mockAppDesc;
private Map<String, InputOperatorSpec> inputOpSpecMap;
private Map<String, OutputStreamImpl> outputStrmMap;
private Set<OperatorSpec> allOpSpecs;
@Before
public void setUp() {
- this.mockGraph = mock(StreamGraphSpec.class);
+ this.mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
/**
* Setup two linear transformation pipelines:
@@ -91,8 +92,8 @@ public class TestOperatorSpecGraph {
inputOpSpecMap.put(streamId2, testInput2);
this.outputStrmMap = new LinkedHashMap<>();
outputStrmMap.put(outputStreamId, outputStream1);
- when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
- when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
+ when(mockAppDesc.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
+ when(mockAppDesc.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
this.allOpSpecs = new HashSet<OperatorSpec>() { {
this.add(testInput);
this.add(filterOp);
@@ -105,7 +106,7 @@ public class TestOperatorSpecGraph {
@After
public void tearDown() {
- this.mockGraph = null;
+ this.mockAppDesc = null;
this.inputOpSpecMap = null;
this.outputStrmMap = null;
this.allOpSpecs = null;
@@ -113,7 +114,7 @@ public class TestOperatorSpecGraph {
@Test
public void testConstructor() {
- OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph);
+ OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc);
assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
assertEquals(specGraph.getOutputStreams(), outputStrmMap);
assertTrue(specGraph.getTables().isEmpty());
@@ -123,7 +124,7 @@ public class TestOperatorSpecGraph {
@Test
public void testClone() {
- OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+ OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone();
OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, clonedSpecGraph);
}
@@ -137,7 +138,7 @@ public class TestOperatorSpecGraph {
//failed with serialization error
try {
- new OperatorSpecGraph(mockGraph);
+ new OperatorSpecGraph(mockAppDesc);
fail("Should have failed with serialization error");
} catch (SamzaException se) {
throw se.getCause();
@@ -150,7 +151,7 @@ public class TestOperatorSpecGraph {
this.allOpSpecs.add(testOp);
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp);
- OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+ OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
//failed with serialization error
try {
operatorSpecGraph.clone();
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
deleted file mode 100644
index 9629efa..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for THE
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings("unchecked")
-public class TestStreamGraphSpec {
-
- @Test
- public void testGetInputStreamWithValueSerde() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- String streamId = "test-stream-1";
- Serde mockValueSerde = mock(Serde.class);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
- MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd);
-
- InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
- assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test
- public void testGetInputStreamWithKeyValueSerde() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- String streamId = "test-stream-1";
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
- MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd);
-
- InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
- assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
- assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testGetInputStreamWithNullSerde() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
- graphSpec.getInputStream(isd);
- }
-
- @Test
- public void testGetInputStreamWithTransformFunction() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- InputTransformer transformer = ime -> ime;
- MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
- MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
- MessageStream inputStream = graphSpec.getInputStream(isd);
-
- InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec();
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
- assertEquals(streamId, inputOpSpec.getStreamId());
- assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
- assertEquals(transformer, inputOpSpec.getTransformer());
- }
-
- @Test
- public void testGetInputStreamWithExpandingSystem() {
- String streamId = "test-stream-1";
- String expandedStreamId = "expanded-stream";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- AtomicInteger expandCallCount = new AtomicInteger();
- StreamExpander expander = (sg, isd) -> {
- expandCallCount.incrementAndGet();
- InputDescriptor expandedISD =
- new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
- .getInputDescriptor(expandedStreamId, new IntegerSerde());
-
- return sg.getInputStream(expandedISD);
- };
- MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
- MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
- MessageStream inputStream = graphSpec.getInputStream(isd);
- InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec();
- assertEquals(1, expandCallCount.get());
- assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
- assertEquals(inputOpSpec, graphSpec.getInputOperators().get(expandedStreamId));
- assertFalse(graphSpec.getInputOperators().containsKey(streamId));
- assertFalse(graphSpec.getInputDescriptors().containsKey(streamId));
- assertTrue(graphSpec.getInputDescriptors().containsKey(expandedStreamId));
- assertEquals(expandedStreamId, inputOpSpec.getStreamId());
- }
-
- @Test
- public void testMultipleGetInputStreams() {
- String streamId1 = "test-stream-1";
- String streamId2 = "test-stream-2";
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
- GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
- MessageStream<Object> inputStream1 = graphSpec.getInputStream(isd1);
- MessageStream<Object> inputStream2 = graphSpec.getInputStream(isd2);
-
- InputOperatorSpec inputOpSpec1 =
- (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
- InputOperatorSpec inputOpSpec2 =
- (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
-
- assertEquals(graphSpec.getInputOperators().size(), 2);
- assertEquals(graphSpec.getInputOperators().get(streamId1), inputOpSpec1);
- assertEquals(graphSpec.getInputOperators().get(streamId2), inputOpSpec2);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameInputStreamTwice() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
- GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
- graphSpec.getInputStream(isd1);
- // should throw exception
- graphSpec.getInputStream(isd2);
- }
-
- @Test
- public void testMultipleSystemDescriptorForSameSystemName() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericInputDescriptor isd1 = sd1.getInputDescriptor(streamId, mock(Serde.class));
- GenericInputDescriptor isd2 = sd2.getInputDescriptor(streamId, mock(Serde.class));
- GenericOutputDescriptor osd1 = sd2.getOutputDescriptor(streamId, mock(Serde.class));
-
- graphSpec.getInputStream(isd1);
- boolean passed = false;
- try {
- graphSpec.getInputStream(isd2);
- passed = true;
- } catch (IllegalStateException e) { }
-
- try {
- graphSpec.getOutputStream(osd1);
- passed = true;
- } catch (IllegalStateException e) { }
-
- try {
- graphSpec.setDefaultSystem(sd2);
- passed = true;
- } catch (IllegalStateException e) { }
-
- assertFalse(passed);
- }
-
- @Test
- public void testGetOutputStreamWithValueSerde() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
- OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd);
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
- assertEquals(streamId, outputStreamImpl.getStreamId());
- assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
- assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test
- public void testGetOutputStreamWithKeyValueSerde() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
- OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd);
-
- OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
- assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
- assertEquals(streamId, outputStreamImpl.getStreamId());
- assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
- assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
- assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testGetOutputStreamWithNullSerde() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
- graphSpec.getOutputStream(osd);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericInputDescriptor id = new GenericSystemDescriptor("system", "factory.class.name")
- .getInputDescriptor("input-stream", mock(Serde.class));
- graphSpec.getInputStream(id);
- graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericOutputDescriptor od = new GenericSystemDescriptor("system", "factory.class.name")
- .getOutputDescriptor("output-stream", mock(Serde.class));
- graphSpec.getOutputStream(od);
- graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSetDefaultSerdeAfterGettingIntermediateStream() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- graphSpec.getIntermediateStream(streamId, mock(Serde.class), false);
- graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameOutputStreamTwice() {
- String streamId = "test-stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
- GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
- graphSpec.getOutputStream(osd1);
- graphSpec.getOutputStream(osd2); // should throw exception
- }
-
- @Test
- public void testGetIntermediateStreamWithValueSerde() {
- String streamId = "stream-1";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- Serde mockValueSerde = mock(Serde.class);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graphSpec.getIntermediateStream(streamId, mockValueSerde, false);
-
- assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
- assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithKeyValueSerde() {
- String streamId = "streamId";
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
- KVSerde mockKVSerde = mock(KVSerde.class);
- Serde mockKeySerde = mock(Serde.class);
- Serde mockValueSerde = mock(Serde.class);
- doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
- doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graphSpec.getIntermediateStream(streamId, mockKVSerde, false);
-
- assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
- assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
- assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
- assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test
- public void testGetIntermediateStreamWithNoSerde() {
- Config mockConfig = mock(Config.class);
- String streamId = "streamId";
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
- IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
- graphSpec.getIntermediateStream(streamId, null, false);
-
- assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
- assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
- assertEquals(streamId, intermediateStreamImpl.getStreamId());
- assertNull(intermediateStreamImpl.getOutputStream().getKeySerde());
- assertNull(intermediateStreamImpl.getOutputStream().getValueSerde());
- assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
- assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetSameIntermediateStreamTwice() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
- graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
- graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
- }
-
- @Test
- public void testGetNextOpIdIncrementsId() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
- assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, null));
- assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
- assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, null));
- }
-
- @Test(expected = SamzaException.class)
- public void testGetNextOpIdRejectsDuplicates() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
- assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
- graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw
- }
-
- @Test
- public void testIdValidation() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
- // null and empty userDefinedIDs should fall back to autogenerated IDs.
- try {
- graphSpec.getNextOpId(OpCode.FILTER, null);
- graphSpec.getNextOpId(OpCode.FILTER, "");
- graphSpec.getNextOpId(OpCode.FILTER, " ");
- graphSpec.getNextOpId(OpCode.FILTER, "\t");
- } catch (SamzaException e) {
- fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
- }
-
- List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
- for (String validOpId: validOpIds) {
- try {
- graphSpec.getNextOpId(OpCode.FILTER, validOpId);
- } catch (Exception e) {
- fail("Received an exception with a valid operator ID: " + validOpId);
- }
- }
-
- List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
- for (String invalidOpId: invalidOpIds) {
- try {
- graphSpec.getNextOpId(OpCode.FILTER, invalidOpId);
- fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
- } catch (SamzaException e) { }
- }
- }
-
- @Test
- public void testGetInputStreamPreservesInsertionOrder() {
- Config mockConfig = mock(Config.class);
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
- String testStreamId1 = "test-stream-1";
- String testStreamId2 = "test-stream-2";
- String testStreamId3 = "test-stream-3";
- GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
- graphSpec.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
- graphSpec.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
- graphSpec.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
-
- List<InputOperatorSpec> inputSpecs = new ArrayList<>(graphSpec.getInputOperators().values());
- assertEquals(inputSpecs.size(), 3);
- assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
- assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
- assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
- }
-
- @Test
- public void testGetTable() {
- Config mockConfig = mock(Config.class);
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- when(mockTableDescriptor.getTableId()).thenReturn("t1");
- when(mockTableDescriptor.getTableSpec()).thenReturn(
- new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>()));
- assertNotNull(graphSpec.getTable(mockTableDescriptor));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGetTableWithBadId() {
- Config mockConfig = mock(Config.class);
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
- BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
- when(mockTableDescriptor.getTableId()).thenReturn("my.table");
- graphSpec.getTable(mockTableDescriptor);
- }
-
- class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
- public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
- super(systemName, "factory.class", null, expander);
- }
-
- @Override
- public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
- return new MockInputDescriptor<>(streamId, this, serde);
- }
- }
-
- class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
- public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
- super(systemName, "factory.class", transformer, null);
- }
-
- @Override
- public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
- return new MockInputDescriptor<>(streamId, this, serde);
- }
- }
-
- public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
- MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
- super(streamId, serde, systemDescriptor, null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 604c72d..6f8a8bc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
@@ -35,6 +34,7 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -49,7 +49,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -58,7 +57,6 @@ import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.InitableFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.util.TimestampedValue;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
@@ -74,6 +72,7 @@ import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.StreamTestUtils;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.TimestampedValue;
import org.junit.After;
import org.junit.Test;
@@ -220,7 +219,7 @@ public class TestOperatorImplGraph {
@Test
public void testEmptyChain() {
- StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
OperatorImplGraph opGraph =
new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
assertEquals(0, opGraph.getAllInputOperators().size());
@@ -244,17 +243,18 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
Config config = new MapConfig(configs);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
- GenericOutputDescriptor outputDescriptor = sd.getOutputDescriptor(outputStreamId, mock(Serde.class));
- MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor);
- OutputStream<Object> outputStream = graphSpec.getOutputStream(outputDescriptor);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+ GenericOutputDescriptor outputDescriptor = sd.getOutputDescriptor(outputStreamId, mock(Serde.class));
+ MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor);
+ OutputStream<Object> outputStream = appDesc.getOutputStream(outputDescriptor);
- inputStream
- .filter(mock(FilterFunction.class))
- .map(mock(MapFunction.class))
- .sendTo(outputStream);
+ inputStream
+ .filter(mock(FilterFunction.class))
+ .map(mock(MapFunction.class))
+ .sendTo(outputStream);
+ }, config);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -297,19 +297,20 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
Config config = new MapConfig(configs);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor = isd.getInputDescriptor(inputStreamId, mock(Serde.class));
- GenericOutputDescriptor outputDescriptor = osd.getOutputDescriptor(outputStreamId,
- KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
- MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor);
- OutputStream<KV<Integer, String>> outputStream = graphSpec.getOutputStream(outputDescriptor);
-
- inputStream
- .partitionBy(Object::hashCode, Object::toString,
- KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), "p1")
- .sendTo(outputStream);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor = isd.getInputDescriptor(inputStreamId, mock(Serde.class));
+ GenericOutputDescriptor outputDescriptor = osd.getOutputDescriptor(outputStreamId,
+ KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+ MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor);
+ OutputStream<KV<Integer, String>> outputStream = appDesc.getOutputStream(outputDescriptor);
+
+ inputStream
+ .partitionBy(Object::hashCode, Object::toString,
+ KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), "p1")
+ .sendTo(outputStream);
+ }, config);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -351,13 +352,13 @@ public class TestOperatorImplGraph {
HashMap<String, String> configMap = new HashMap<>();
StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName);
Config config = new MapConfig(configMap);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-
- GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
- MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor);
- inputStream.filter(mock(FilterFunction.class));
- inputStream.map(mock(MapFunction.class));
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+ MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor);
+ inputStream.filter(mock(FilterFunction.class));
+ inputStream.map(mock(MapFunction.class));
+ }, config);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -380,31 +381,28 @@ public class TestOperatorImplGraph {
HashMap<String, String> configs = new HashMap<>();
StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
Config config = new MapConfig(configs);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-
- GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
- MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor);
- MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
- MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
- MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
-
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+ MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor);
+ MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
+ MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
+ stream1.merge(Collections.singleton(stream2))
+ .map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m));
+ }, mock(Config.class));
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
TaskName mockTaskName = mock(TaskName.class);
when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- MapFunction testMapFunction = new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m);
- mergedStream.map(testMapFunction);
-
OperatorImplGraph opImplGraph =
new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new,
(s, op) -> addOperatorRecursively(s, op), HashSet::addAll);
Object[] mergeOps = opSet.stream().filter(op -> op.getOperatorSpec().getOpCode() == OpCode.MERGE).toArray();
- assertEquals(mergeOps.length, 1);
- assertEquals(((OperatorImpl) mergeOps[0]).registeredOperators.size(), 1);
+ assertEquals(1, mergeOps.length);
+ assertEquals(1, ((OperatorImpl) mergeOps[0]).registeredOperators.size());
OperatorImpl mapOp = (OperatorImpl) ((OperatorImpl) mergeOps[0]).registeredOperators.iterator().next();
assertEquals(mapOp.getOperatorSpec().getOpCode(), OpCode.MAP);
@@ -425,21 +423,22 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1);
StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2);
Config config = new MapConfig(configs);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
Integer joinKey = new Integer(1);
Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1",
(BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
- GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
- GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
- MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1);
- MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+ GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+ MessageStream<Object> inputStream1 = appDesc.getInputStream(inputDescriptor1);
+ MessageStream<Object> inputStream2 = appDesc.getInputStream(inputDescriptor2);
- inputStream1.join(inputStream2, testJoinFunction,
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1");
+ inputStream1.join(inputStream2, testJoinFunction,
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1");
+ }, config);
TaskName mockTaskName = mock(TaskName.class);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
@@ -496,20 +495,20 @@ public class TestOperatorImplGraph {
TaskContextImpl mockContext = mock(TaskContextImpl.class);
when(mockContext.getTaskName()).thenReturn(mockTaskName);
when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
- GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
- GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
- MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1);
- MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+ GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+ MessageStream<Object> inputStream1 = appDesc.getInputStream(inputDescriptor1);
+ MessageStream<Object> inputStream2 = appDesc.getInputStream(inputDescriptor2);
- Function mapFn = (Function & Serializable) m -> m;
- inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
- .map(new TestMapFunction<Object, Object>("2", mapFn));
+ Function mapFn = (Function & Serializable) m -> m;
+ inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
+ .map(new TestMapFunction<Object, Object>("2", mapFn));
- inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
- .map(new TestMapFunction<Object, Object>("4", mapFn));
+ inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
+ .map(new TestMapFunction<Object, Object>("4", mapFn));
+ }, mockConfig);
OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance());
@@ -592,33 +591,34 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2);
Config config = new MapConfig(configs);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
- GenericInputDescriptor inputDescriptor1 = isd.getInputDescriptor(inputStreamId1, mock(Serde.class));
- GenericInputDescriptor inputDescriptor2 = isd.getInputDescriptor(inputStreamId2, mock(Serde.class));
- GenericInputDescriptor inputDescriptor3 = isd.getInputDescriptor(inputStreamId3, mock(Serde.class));
- GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass");
- GenericOutputDescriptor outputDescriptor1 = osd.getOutputDescriptor(outputStreamId1, mock(Serde.class));
- GenericOutputDescriptor outputDescriptor2 = osd.getOutputDescriptor(outputStreamId2, mock(Serde.class));
- MessageStream messageStream1 = graphSpec.getInputStream(inputDescriptor1).map(m -> m);
- MessageStream messageStream2 = graphSpec.getInputStream(inputDescriptor2).filter(m -> true);
- MessageStream messageStream3 =
- graphSpec.getInputStream(inputDescriptor3)
- .filter(m -> true)
- .partitionBy(m -> "m", m -> m, "p1")
- .map(m -> m);
- OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputDescriptor1);
- OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputDescriptor2);
-
- messageStream1
- .join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
- .partitionBy(m -> "m", m -> m, "p2")
- .sendTo(outputStream1);
- messageStream3
- .join(messageStream2, mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
- .sendTo(outputStream2);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
+ GenericInputDescriptor inputDescriptor1 = isd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+ GenericInputDescriptor inputDescriptor2 = isd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+ GenericInputDescriptor inputDescriptor3 = isd.getInputDescriptor(inputStreamId3, mock(Serde.class));
+ GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass");
+ GenericOutputDescriptor outputDescriptor1 = osd.getOutputDescriptor(outputStreamId1, mock(Serde.class));
+ GenericOutputDescriptor outputDescriptor2 = osd.getOutputDescriptor(outputStreamId2, mock(Serde.class));
+ MessageStream messageStream1 = appDesc.getInputStream(inputDescriptor1).map(m -> m);
+ MessageStream messageStream2 = appDesc.getInputStream(inputDescriptor2).filter(m -> true);
+ MessageStream messageStream3 =
+ appDesc.getInputStream(inputDescriptor3)
+ .filter(m -> true)
+ .partitionBy(m -> "m", m -> m, "p1")
+ .map(m -> m);
+ OutputStream<Object> outputStream1 = appDesc.getOutputStream(outputDescriptor1);
+ OutputStream<Object> outputStream2 = appDesc.getOutputStream(outputDescriptor2);
+
+ messageStream1
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ .partitionBy(m -> "m", m -> m, "p2")
+ .sendTo(outputStream1);
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(outputStream2);
+ }, config);
Multimap<SystemStream, SystemStream> outputToInput =
OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(), new StreamConfig(config));
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 6c34fcd..7d468c9 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -19,25 +19,33 @@
package org.apache.samza.operators.impl;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.impl.store.TestInMemoryStore;
import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
-import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
@@ -59,15 +67,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -546,72 +545,80 @@ public class TestWindowOperator {
verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
- private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode,
+ private StreamApplicationDescriptorImpl getKeyedTumblingWindowStreamGraph(AccumulationMode mode,
Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
- StreamGraphSpec graph = new StreamGraphSpec(config);
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
- GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
- graph.getInputStream(inputDescriptor)
- .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
- .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
-
- return graph;
+
+ StreamApplication userApp = appDesc -> {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
+ appDesc.getInputStream(inputDescriptor)
+ .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
+ .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ };
+
+ return new StreamApplicationDescriptorImpl(userApp, config);
}
- private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
+ private StreamApplicationDescriptorImpl getTumblingWindowStreamGraph(AccumulationMode mode,
Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
- StreamGraphSpec graph = new StreamGraphSpec(config);
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
- GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
- graph.getInputStream(inputDescriptor)
- .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- return graph;
+ StreamApplication userApp = appDesc -> {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
+ appDesc.getInputStream(inputDescriptor)
+ .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ };
+
+ return new StreamApplicationDescriptorImpl(userApp, config);
}
- private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException {
- StreamGraphSpec graph = new StreamGraphSpec(config);
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
- GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
- graph.getInputStream(inputDescriptor)
- .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- return graph;
+ private StreamApplicationDescriptorImpl getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException {
+ StreamApplication userApp = appDesc -> {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
+ appDesc.getInputStream(inputDescriptor)
+ .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ };
+
+ return new StreamApplicationDescriptorImpl(userApp, config);
}
- private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration,
+ private StreamApplicationDescriptorImpl getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration,
Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
- StreamGraphSpec graph = new StreamGraphSpec(config);
- KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
- GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
- GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
- MessageStream<KV<Integer, Integer>> integers = graph.getInputStream(inputDescriptor);
-
- integers
- .map(new KVMapFunction())
- .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- return graph;
+ StreamApplication userApp = appDesc -> {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass");
+ GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde);
+ MessageStream<KV<Integer, Integer>> integers = appDesc.getInputStream(inputDescriptor);
+
+ integers
+ .map(new KVMapFunction())
+ .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
+ .setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ };
+
+ return new StreamApplicationDescriptorImpl(userApp, config);
}
private static class IntegerEnvelope extends IncomingMessageEnvelope {