You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/07 19:42:19 UTC
[1/4] samza git commit: SAMZA-1221,
SAMZA-1101: Internal cleanup for High-Level API implementation.
Repository: samza
Updated Branches:
refs/heads/master 29cf374c5 -> c1c4289c8
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index a5d9539..e183d87 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -28,12 +28,10 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -50,12 +48,9 @@ public class TestStreamOperatorImpl {
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
- spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
+ new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
- TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
- Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
- this.add(outMsg);
- } };
+ Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
@@ -74,7 +69,7 @@ public class TestStreamOperatorImpl {
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
- spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
+ new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
// ensure that close is not called yet
verify(txfmFn, times(0)).close();
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
deleted file mode 100644
index cccafaf..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorSpecs {
- @Test
- public void testCreateStreamOperator() {
- FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
- } };
- MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
- StreamOperatorSpec<Object, TestMessageEnvelope> streamOp =
- OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
- assertEquals(streamOp.getTransformFn(), transformFn);
-
- Object mockInput = mock(Object.class);
- when(mockInput.toString()).thenReturn("test-string-1");
- List<TestMessageEnvelope> outputs = (List<TestMessageEnvelope>) streamOp.getTransformFn().apply(mockInput);
- assertEquals(outputs.size(), 1);
- assertEquals(outputs.get(0).getKey(), "test-string-1");
- assertEquals(outputs.get(0).getMessage().getValue(), "test-string-1");
- assertEquals(outputs.get(0).getMessage().getEventTime(), 12345L);
- assertEquals(streamOp.getNextStream(), mockOutput);
- }
-
- @Test
- public void testCreateSinkOperator() {
- SystemStream testStream = new SystemStream("test-sys", "test-stream");
- SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
- TaskCoordinator taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage()));
- };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
- assertEquals(sinkOp.getSinkFn(), sinkFn);
-
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- when(mockInput.getKey()).thenReturn("my-test-msg-key");
- MessageType mockMsgBody = mock(MessageType.class);
- when(mockInput.getMessage()).thenReturn(mockMsgBody);
- final List<OutgoingMessageEnvelope> outputMsgs = new ArrayList<>();
- MessageCollector mockCollector = mock(MessageCollector.class);
- doAnswer(invocation -> {
- outputMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
- return null;
- }).when(mockCollector).send(any());
- sinkOp.getSinkFn().apply(mockInput, mockCollector, null);
- assertEquals(1, outputMsgs.size());
- assertEquals(outputMsgs.get(0).getKey(), "my-test-msg-key");
- assertEquals(outputMsgs.get(0).getMessage(), mockMsgBody);
- assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK);
- assertEquals(sinkOp.getNextStream(), null);
- }
-
- @Test
- public void testCreateSendToOperator() {
- OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1);
- assertNotNull(sinkOp.getSinkFn());
- assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO);
- assertEquals(sinkOp.getNextStream(), null);
- }
-
-
- @Test
- public void testCreatePartitionByOperator() {
- OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1);
- assertNotNull(sinkOp.getSinkFn());
- assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY);
- assertEquals(sinkOp.getNextStream(), null);
- }
-
- @Test
- public void testCreateWindowOperator() throws Exception {
- Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
- FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
- Supplier<Integer> initialValue = () -> 0;
- //instantiate a window using reflection
- WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
-
- MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
- WindowOperatorSpec spec =
- OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1);
- assertEquals(spec.getWindow(), window);
- assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
- assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
- }
-
- @Test
- public void testCreateWindowOperatorWithRelaxedTypes() throws Exception {
- Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
- FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
- Supplier<Integer> initialValue = () -> 0;
- //instantiate a window using reflection
- WindowInternal<TestInputMessageEnvelope, String, Integer> window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
-
- MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
- WindowOperatorSpec spec =
- OperatorSpecs.createWindowOperatorSpec(window, mockWndOut, 1);
- assertEquals(spec.getWindow(), window);
- assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
- assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
-
- // make sure that the functions with relaxed types work as expected
- TestInputMessageEnvelope inputMsg = new TestInputMessageEnvelope("test-input-key1", "test-value-1", 23456L, "input-id-1");
- assertEquals("test-input-key1", spec.getWindow().getKeyExtractor().apply(inputMsg));
- assertEquals(1, spec.getWindow().getFoldLeftFunction().apply(inputMsg, 0));
- }
-
- @Test
- public void testCreatePartialJoinOperator() {
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn
- = mock(PartialJoinFunction.class);
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn
- = mock(PartialJoinFunction.class);
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-
- PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec
- = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1);
-
- assertEquals(partialJoinSpec.getNextStream(), joinOutput);
- assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn);
- assertEquals(partialJoinSpec.getOtherPartialJoinFn(), otherPartialJoinFn);
- }
-
- @Test
- public void testCreateMergeOperator() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp =
- OperatorSpecs.createMergeOperatorSpec(output, 1);
- Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn =
- t -> new ArrayList<TestMessageEnvelope>() { {
- this.add(t);
- } };
- TestMessageEnvelope t = mock(TestMessageEnvelope.class);
- assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
- assertEquals(mergeOp.getNextStream(), output);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index affe37f..12a32b1 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.internal.WindowInternal;
@@ -38,14 +37,15 @@ public class TestWindowOperatorSpec {
Triggers.any(Triggers.count(23),
Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
Triggers.any(Triggers.any(Triggers.count(6),
- Triggers.timeSinceFirstMessage(Duration.ofMillis(15)), Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
+ Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
+ Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));
WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION);
window.setEarlyTrigger(earlyTrigger);
window.setLateTrigger(lateTrigger);
- WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0);
+ WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
}
@@ -57,7 +57,7 @@ public class TestWindowOperatorSpec {
WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION);
window.setEarlyTrigger(earlyTrigger);
- WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0);
+ WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
Assert.assertEquals(spec.getDefaultTriggerMs(), 150);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-shell/src/main/visualizer/js/planToDagre.js
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/visualizer/js/planToDagre.js b/samza-shell/src/main/visualizer/js/planToDagre.js
index 8ba198e..0421c33 100644
--- a/samza-shell/src/main/visualizer/js/planToDagre.js
+++ b/samza-shell/src/main/visualizer/js/planToDagre.js
@@ -42,15 +42,11 @@ function planToDagre(data) {
var jobs = data.jobs;
for (var i = 0; i < jobs.length; i++) {
- var canonicalOpIds = jobs[i].operatorGraph.canonicalOpIds;
var operators = jobs[i].operatorGraph.operators;
for (var opId in operators) {
var operator = operators[opId];
var labelVal = "<div><h3 class=\"topbar\">" + operator.opCode + "</h3><ul class=\"detailBox\">";
var opId = operator.opId;
- if (!(opId in canonicalOpIds)) {
- canonicalOpIds[opId] = opId.toString();
- }
labelVal += "<li>ID: " + opId + "</li>";
labelVal += "<li>@" + operator.sourceLocation + "</li>";
@@ -62,7 +58,7 @@ function planToDagre(data) {
}
labelVal += "</ul></div>";
- g.setNode(canonicalOpIds[opId], { label: labelVal, labelType: "html", rx: 5, ry: 5 });
+ g.setNode(opId, { label: labelVal, labelType: "html", rx: 5, ry: 5 });
}
}
@@ -71,7 +67,7 @@ function planToDagre(data) {
for (var k = 0; k < inputs.length; k++) {
var input = inputs[k];
for (var m = 0; m < input.nextOperatorIds.length; m++) {
- g.setEdge(input.streamId, canonicalOpIds[input.nextOperatorIds[m]]);
+ g.setEdge(input.streamId, input.nextOperatorIds[m]);
}
}
@@ -79,10 +75,10 @@ function planToDagre(data) {
for (var opId in operators) {
var operator = operators[opId];
for (var j = 0; j < operator.nextOperatorIds.length; j++) {
- g.setEdge(canonicalOpIds[opId], canonicalOpIds[operator.nextOperatorIds[j]]);
+ g.setEdge(opId, operator.nextOperatorIds[j]);
}
if (typeof(operator.outputStreamId) !== 'undefined') {
- g.setEdge(canonicalOpIds[opId], operator.outputStreamId);
+ g.setEdge(opId, operator.outputStreamId);
}
}
}
[2/4] samza git commit: SAMZA-1221,
SAMZA-1101: Internal cleanup for High-Level API implementation.
Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index b2a5e2a..61224f2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -19,302 +19,343 @@
package org.apache.samza.operators;
import com.google.common.collect.ImmutableList;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+@SuppressWarnings("unchecked")
public class TestMessageStreamImpl {
- private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
@Test
public void testMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
- new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
- assertTrue(mapOp instanceof StreamOperatorSpec);
- assertEquals(mapOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
- MessageType mockInnerTestMessage = mock(MessageType.class);
- when(xTestMsg.getKey()).thenReturn("test-msg-key");
- when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
- Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
- assertEquals(cOutputMsg.size(), 1);
- TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
- assertEquals(outputMessage.getKey(), xTestMsg.getKey());
- assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockMapFn = mock(MapFunction.class);
+ inputStream.map(mockMapFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+ assertNotNull(transformFn);
+ assertEquals(OpCode.MAP, registeredOpSpec.getOpCode());
+
+ TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+ when(mockMapFn.apply(anyObject())).thenReturn(mockOutput);
+ assertTrue(transformFn.apply(new Object()).contains(mockOutput));
+ when(mockMapFn.apply(anyObject())).thenReturn(null);
+ assertTrue(transformFn.apply(null).isEmpty());
}
@Test
public void testFlatMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- List<TestOutputMessageEnvelope> flatOuts = new ArrayList<TestOutputMessageEnvelope>() { {
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- } };
- final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> {
- inputMsgs.add(message);
- return flatOuts;
- };
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
- assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getNextStream(), outputStream);
- assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- // assert that the transformation function is what we defined above
- List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
- ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
- assertEquals(flatOuts, result);
- assertEquals(inputMsgs.size(), 1);
- assertEquals(inputMsgs.get(0), mockInput);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ inputStream.flatMap(mock(FlatMapFunction.class));
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+ assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+ assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
}
@Test
public void testFlatMapWithRelaxedTypes() {
- MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- List<TestExtOutputMessageEnvelope> flatOuts = new ArrayList<TestExtOutputMessageEnvelope>() { {
- this.add(new TestExtOutputMessageEnvelope("output-key-1", 1, "output-id-001"));
- this.add(new TestExtOutputMessageEnvelope("output-key-2", 2, "output-id-002"));
- this.add(new TestExtOutputMessageEnvelope("output-key-3", 3, "output-id-003"));
- } };
-
- class MyFlatMapFunction implements FlatMapFunction<TestMessageEnvelope, TestExtOutputMessageEnvelope> {
- public final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-
- @Override
- public Collection<TestExtOutputMessageEnvelope> apply(TestMessageEnvelope message) {
- inputMsgs.add(message);
- return flatOuts;
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- inputMsgs.clear();
- }
- }
-
- MyFlatMapFunction xFlatMap = new MyFlatMapFunction();
-
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
- assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getNextStream(), outputStream);
- assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- // assert that the transformation function is what we defined above
- List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
- ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
- assertEquals(flatOuts, result);
- assertEquals(xFlatMap.inputMsgs.size(), 1);
- assertEquals(xFlatMap.inputMsgs.get(0), mockInput);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ FlatMapFunction flatMapFunction =
+ (FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>) message -> Collections.emptyList();
+ // should compile since TestInputMessageEnvelope extends TestMessageEnvelope
+ inputStream.flatMap(flatMapFunction);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+ assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+ assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
}
@Test
public void testFilter() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
- MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
- assertTrue(filterOp instanceof StreamOperatorSpec);
- assertEquals(filterOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- MessageType mockInnerTestMessage = mock(MessageType.class);
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
- Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
- assertTrue(output.isEmpty());
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
- output = txfmFn.apply(mockMsg);
- assertEquals(output.size(), 1);
- assertEquals(output.iterator().next(), mockMsg);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ FilterFunction<Object> mockFilterFn = mock(FilterFunction.class);
+ inputStream.filter(mockFilterFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+ assertNotNull(transformFn);
+ assertEquals(OpCode.FILTER, registeredOpSpec.getOpCode());
+
+ Object mockInput = new Object();
+ when(mockFilterFn.apply(anyObject())).thenReturn(true);
+ assertTrue(transformFn.apply(mockInput).contains(mockInput));
+ when(mockFilterFn.apply(anyObject())).thenReturn(false);
+ assertTrue(transformFn.apply(mockInput).isEmpty());
}
@Test
public void testSink() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- SystemStream testStream = new SystemStream("test-sys", "test-stream");
- SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
- mc.send(new OutgoingMessageEnvelope(testStream, m.getMessage()));
- tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
- };
- inputStream.sink(xSink);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
- assertTrue(sinkOp instanceof SinkOperatorSpec);
- assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-
- TestMessageEnvelope mockTest1 = mock(TestMessageEnvelope.class);
- MessageType mockMsgBody = mock(MessageType.class);
- when(mockTest1.getMessage()).thenReturn(mockMsgBody);
- final List<OutgoingMessageEnvelope> outMsgs = new ArrayList<>();
- MessageCollector mockCollector = mock(MessageCollector.class);
- doAnswer(invocation -> {
- outMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
- return null;
- }).when(mockCollector).send(any());
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- ((SinkOperatorSpec) sinkOp).getSinkFn().apply(mockTest1, mockCollector, mockCoordinator);
- assertEquals(1, outMsgs.size());
- assertEquals(testStream, outMsgs.get(0).getSystemStream());
- assertEquals(mockMsgBody, outMsgs.get(0).getMessage());
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ inputStream.sink(mock(SinkFunction.class));
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof SinkOperatorSpec);
+ assertNotNull(((SinkOperatorSpec) registeredOpSpec).getSinkFn());
+ assertEquals(OpCode.SINK, registeredOpSpec.getOpCode());
}
@Test
- public void testJoin() {
- MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
- JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- };
-
- MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner, Duration.ofMinutes(1));
- Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
- assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
- subs = source2.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
- assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
- TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
- TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
- TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getThisPartialJoinFn().apply(joinMsg1, joinMsg2);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
- xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getThisPartialJoinFn().apply(joinMsg2, joinMsg1);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ public void testSendTo() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ inputStream.sendTo(mockOutputOpSpec);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
}
@Test
- public void testMerge() {
- MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
- Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
- new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
- MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
- validateMergeOperator(merge1, mergeOutput);
+ public void testPartitionBy() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+ String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+ Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
+ OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+ .thenReturn(mockIntermediateStream);
+ when(mockIntermediateStream.getOutputStream())
+ .thenReturn(mockOutputOpSpec);
+
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ inputStream.partitionBy(mockKeyFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ }
- others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ @Test
+ public void testWindowWithRelaxedTypes() throws Exception {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+ FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+ Supplier<Integer> initialValue = () -> 0;
+
+ // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
+ Window<TestInputMessageEnvelope, String, Integer> window = Windows
+ .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator);
+ MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
+ assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
+ assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
}
@Test
- public void testMergeWithRelaxedTypes() {
- MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
- Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
- new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
- new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
+ public void testJoin() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+ OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+ JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> mockJoinFn =
+ mock(JoinFunction.class);
+
+ Duration joinTtl = Duration.ofMinutes(1);
+ source1.join(source2, mockJoinFn, joinTtl);
+
+ ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue();
+
+ ArgumentCaptor<OperatorSpec> rightRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(rightInputOpSpec).registerNextOperatorSpec(rightRegisteredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> rightRegisteredOpSpec = rightRegisteredOpCaptor.getValue();
+
+ assertEquals(leftRegisteredOpSpec, rightRegisteredOpSpec);
+ assertEquals(OpCode.JOIN, leftRegisteredOpSpec.getOpCode());
+ assertTrue(leftRegisteredOpSpec instanceof JoinOperatorSpec);
+ assertEquals(mockJoinFn, ((JoinOperatorSpec) leftRegisteredOpSpec).getJoinFn());
+ assertEquals(joinTtl.toMillis(), ((JoinOperatorSpec) leftRegisteredOpSpec).getTtlMs());
+ assertEquals(leftInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getLeftInputOpSpec());
+ assertEquals(rightInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getRightInputOpSpec());
+ }
- // should compile
- MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
- validateMergeOperator(input1, mergeOutput);
+ @Test
+ public void testMerge() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
+ MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
+
+ // other streams have the same message type T as input stream message type M
+ OperatorSpec mockOpSpec2 = mock(OperatorSpec.class);
+ OperatorSpec mockOpSpec3 = mock(OperatorSpec.class);
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+ new MessageStreamImpl<>(mockGraph, mockOpSpec2),
+ new MessageStreamImpl<>(mockGraph, mockOpSpec3)
+ );
+
+ inputStream.merge(otherStreams1);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor1 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec1).registerNextOperatorSpec(registeredOpCaptor1.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec1 = registeredOpCaptor1.getValue();
+ assertTrue(registeredOpSpec1 instanceof StreamOperatorSpec);
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec1).getTransformFn();
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor2 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec2).registerNextOperatorSpec(registeredOpCaptor2.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec2 = registeredOpCaptor2.getValue();
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor3 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec3).registerNextOperatorSpec(registeredOpCaptor3.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec3 = registeredOpCaptor3.getValue();
+
+ assertEquals(registeredOpSpec1, registeredOpSpec2);
+ assertEquals(registeredOpSpec2, registeredOpSpec3);
+ assertEquals(OpCode.MERGE, registeredOpSpec1.getOpCode());
+
+ assertNotNull(transformFn);
+ TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+ assertTrue(transformFn.apply(mockInput).contains(mockInput));
+ assertEquals(1, transformFn.apply(mockInput).size());
+ }
- others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
+ @Test
+ public void testMergeWithRelaxedTypes() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
+
+ // other streams have the same message type T as input stream message type M
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+ new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // other streams have the same message type T that extends as input stream message type M
+ Collection<MessageStream<TestInputMessageEnvelope>> otherStreams2 = ImmutableList.of(
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // other streams have a mix of message types such that T extends input stream message type M
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams3 = ImmutableList.of(
+ new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ // unchecked cast required for the next stream
+ (MessageStream) new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // not supported:
+ // other streams have a mix of message types such that T extends input stream message type M
+ Collection<MessageStream<? extends TestMessageEnvelope>> otherStreams4 = ImmutableList.of(
+ new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // check if all type combinations compile
+ inputStream.merge(otherStreams1);
+ inputStream.merge(otherStreams2);
+ inputStream.merge(otherStreams3);
+ inputStream.merge(otherStreams4);
}
@Test
public <T> void testMergeWithNestedTypes() {
class MessageEnvelope<TM> { }
- MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
- MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
- MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+ MessageStream<MessageEnvelope<T>> ms1 = mock(MessageStreamImpl.class);
+ MessageStream<MessageEnvelope<T>> ms2 = mock(MessageStreamImpl.class);
+ MessageStream<MessageEnvelope<T>> ms3 = mock(MessageStreamImpl.class);
Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
// should compile
ms1.merge(otherStreams);
}
- private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
- Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
- assertTrue(mergeOp instanceof StreamOperatorSpec);
- assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(
- mockMsg);
- assertEquals(outputs.size(), 1);
- assertEquals(outputs.iterator().next(), mockMsg);
- }
-
@Test
public void testMergeAll() {
MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
@@ -361,26 +402,9 @@ public class TestMessageStreamImpl {
assertTrue(otherStreamsCaptor.getValue().contains(input3));
}
- @Test
- public void testPartitionBy() {
- Map<String, String> map = new HashMap<>();
- map.put(JobConfig.JOB_DEFAULT_SYSTEM(), "testsystem");
- Config config = new MapConfig(map);
- ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
- Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
- inputStream.partitionBy(keyExtractorFunc);
- assertTrue(streamGraph.getInputStreams().size() == 1);
- assertTrue(streamGraph.getOutputStreams().size() == 1);
-
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> partitionByOp = subs.iterator().next();
- assertTrue(partitionByOp instanceof SinkOperatorSpec);
- assertNull(partitionByOp.getNextStream());
-
- ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
- envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
+ class TestInputMessageEnvelope extends TestMessageEnvelope {
+ public TestInputMessageEnvelope(String key, Object value) {
+ super(key, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
- public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
- return new MessageStreamImpl<M>(graph);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 9d95217..1fc60bd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -21,23 +21,21 @@ package org.apache.samza.operators;
import junit.framework.Assert;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,152 +44,141 @@ public class TestStreamGraphImpl {
@Test
public void testGetInputStream() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ }
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
- MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder);
- assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream);
- assertTrue(mInputStream instanceof InputStreamInternalImpl);
- assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder);
-
- String key = "test-input-key";
- MessageType msgBody = new MessageType("test-msg-value", 333333L);
- TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream).
- getMsgBuilder().apply(key, msgBody);
- assertEquals(xInputMsg.getKey(), key);
- assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue());
- assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime());
- assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1");
+ @Test
+ public void testGetInputStreamWithRelaxedTypes() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ }
+
+ @Test
+ public void testMultipleGetInputStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
+ StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
+ MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
+
+ assertEquals(graph.getInputOperators().size(), 2);
+ assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
+ assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
}
@Test(expected = IllegalStateException.class)
- public void testMultipleGetInputStream() {
+ public void testGetSameInputStreamTwice() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
- StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system");
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getInputStream("test-stream-1", mock(BiFunction.class));
+ graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+ }
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+ @Test
+ public void testGetOutputStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
- //create 2 streams for the corresponding streamIds
- MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder);
- MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+ Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
- //assert that the streamGraph contains only the above 2 streams
- assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
- assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
- assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
- assertEquals(graph.getInputStreams().size(), 2);
+ OutputStream<String, String, TestMessageEnvelope> outputStream =
+ graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
- //should throw IllegalStateException
- graph.getInputStream("test-stream-1", xMsgBuilder);
+ OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
+ assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
+ assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
+ assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
}
-
- @Test
- public void testGetOutputStream() {
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameOutputStreamTwice() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
-
- class MyMessageType extends MessageType {
- public final String outputId;
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- public MyMessageType(String value, long eventTime, String outputId) {
- super(value, eventTime);
- this.outputId = outputId;
- }
- }
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
- Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
- x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-
- OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream =
- graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor);
- assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream);
- assertTrue(mOutputStream instanceof OutputStreamInternalImpl);
- assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor);
- assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor);
-
- TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getKeyExtractor().apply(xInputMsg), "test-key-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1");
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
+ graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
}
@Test
public void testGetIntermediateStream() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
- class MyMessageType extends MessageType {
- public final String outputId;
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+ Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
+ assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
+ assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+ }
- public MyMessageType(String value, long eventTime, String outputId) {
- super(value, eventTime);
- this.outputId = outputId;
- }
- }
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameIntermediateStreamTwice() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
- Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
- x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-
- MessageStream<TestMessageEnvelope> mIntermediateStream =
- graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder);
- assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream);
- assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder);
-
- TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L);
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getKeyExtractor().apply(xInputMsg), "test-key-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+ graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
}
@Test
- public void testGetNextOpId() {
+ public void testGetNextOpIdIncrementsId() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
assertEquals(graph.getNextOpId(), 0);
assertEquals(graph.getNextOpId(), 1);
}
@@ -216,10 +203,10 @@ public class TestStreamGraphImpl {
graph.getInputStream("test-stream-2", (k, v) -> v);
graph.getInputStream("test-stream-3", (k, v) -> v);
- ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
- Assert.assertEquals(inputMessageStreams.size(), 3);
- Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
- Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
- Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+ List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
+ Assert.assertEquals(inputSpecs.size(), 3);
+ Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
+ Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
+ Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
deleted file mode 100644
index 3fd015b..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class MessageType {
- private final String value;
- private final long eventTime;
-
- public MessageType(String value, long eventTime) {
- this.value = value;
- this.eventTime = eventTime;
- }
-
- public long getEventTime() {
- return eventTime;
- }
-
- public String getValue() {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
deleted file mode 100644
index 22222ed..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestExtOutputMessageEnvelope extends TestOutputMessageEnvelope {
- private final String outputId;
-
- public TestExtOutputMessageEnvelope(String key, Integer value, String outputId) {
- super(key, value);
- this.outputId = outputId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
deleted file mode 100644
index 089f534..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestInputMessageEnvelope extends TestMessageEnvelope {
- private final String inputId;
-
- public TestInputMessageEnvelope(String key, String value, long eventTime, String inputId) {
- super(key, value, eventTime);
- this.inputId = inputId;
- }
-
- public String getInputId() {
- return this.inputId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
index 05a63cd..68305cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -20,21 +20,19 @@ package org.apache.samza.operators.data;
public class TestMessageEnvelope {
-
private final String key;
- private final MessageType value;
+ private final Object value;
- public TestMessageEnvelope(String key, String value, long eventTime) {
+ public TestMessageEnvelope(String key, Object value) {
this.key = key;
- this.value = new MessageType(value, eventTime);
+ this.value = value;
}
- public MessageType getMessage() {
+ public Object getMessage() {
return this.value;
}
public String getKey() {
return this.key;
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 73d851b..d50d271 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -23,7 +23,6 @@ import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
@@ -177,9 +176,11 @@ public class TestOperatorImpl {
private static class TestOpImpl extends OperatorImpl<Object, Object> {
private final Object mockOutput;
+ private final TestOpSpec testOpSpec;
TestOpImpl(Object mockOutput) {
this.mockOutput = mockOutput;
+ this.testOpSpec = new TestOpSpec();
}
@Override
@@ -199,31 +200,14 @@ public class TestOperatorImpl {
@Override
protected void handleClose() {}
- @Override
- protected OperatorSpec<Object> getOperatorSpec() {
- return new TestOpSpec();
+ protected OperatorSpec<Object, Object> getOperatorSpec() {
+ return testOpSpec;
}
}
- private static class TestOpSpec implements OperatorSpec<Object> {
- @Override
- public MessageStreamImpl<Object> getNextStream() {
- return null;
- }
-
- @Override
- public OpCode getOpCode() {
- return OpCode.INPUT;
- }
-
- @Override
- public int getOpId() {
- return -1;
- }
-
- @Override
- public String getSourceLocation() {
- return "";
+ private static class TestOpSpec extends OperatorSpec<Object, Object> {
+ TestOpSpec() {
+ super(OpCode.INPUT, 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 67e5b46..b2c7722 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,86 +19,211 @@
package org.apache.samza.operators.impl;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.junit.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-
-import static junit.framework.Assert.assertEquals;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestOperatorImplGraph {
+ public void testEmptyChain() {
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
+ OperatorImplGraph opGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class));
+ assertEquals(0, opGraph.getAllInputOperators().size());
+ }
+
@Test
- public void testOperatorGraphInitAndClose() {
+ public void testLinearChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ OutputStream<Object, Object, Object> outputStream =
+ streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+
+ inputStream
+ .filter(mock(FilterFunction.class))
+ .map(mock(MapFunction.class))
+ .sendTo(outputStream);
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(1, inputOpImpl.registeredOperators.size());
+
+ OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+ assertEquals(1, filterOpImpl.registeredOperators.size());
+ assertEquals(OpCode.FILTER, filterOpImpl.getOperatorSpec().getOpCode());
+
+ OperatorImpl mapOpImpl = (StreamOperatorImpl) filterOpImpl.registeredOperators.iterator().next();
+ assertEquals(1, mapOpImpl.registeredOperators.size());
+ assertEquals(OpCode.MAP, mapOpImpl.getOperatorSpec().getOpCode());
+
+ OperatorImpl sendToOpImpl = (OutputOperatorImpl) mapOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, sendToOpImpl.registeredOperators.size());
+ assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+ }
+
+
+ @Test
+ public void testBroadcastChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ inputStream.filter(mock(FilterFunction.class));
+ inputStream.map(mock(MapFunction.class));
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(2, inputOpImpl.registeredOperators.size());
+ assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+ ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+ assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+ ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+ }
+
+ @Test
+ public void testJoinChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
- StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+ when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+ when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ JoinFunction mockJoinFunction = mock(JoinFunction.class);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ // verify that join function is initialized once.
+ verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+
+ InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
+ InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
+ PartialJoinOperatorImpl leftPartialJoinOpImpl =
+ (PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next();
+ PartialJoinOperatorImpl rightPartialJoinOpImpl =
+ (PartialJoinOperatorImpl) inputOpImpl2.registeredOperators.iterator().next();
+
+ assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
+ assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
+
+ Object joinKey = new Object();
+ // verify that left partial join operator calls getFirstKey
+ Object mockLeftMessage = mock(Object.class);
+ when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
+ inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
+
+ // verify that right partial join operator calls getSecondKey
+ Object mockRightMessage = mock(Object.class);
+ when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
+ inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
+
+ // verify that the join function apply is called with the correct messages on match
+ verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
+ }
+ @Test
+ public void testOperatorGraphInitAndClose() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+ when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
Config mockConfig = mock(Config.class);
- TaskContext mockContext = createMockContext();
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
- List<String> initializationOrder = new ArrayList<>();
- List<String> finalizationOrder = new ArrayList<>();
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
- MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
- MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+ List<String> initializedOperators = new ArrayList<>();
+ List<String> closedOperators = new ArrayList<>();
- inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
- .map(createMapFunction("2", initializationOrder, finalizationOrder));
+ inputStream1.map(createMapFunction("1", initializedOperators, closedOperators))
+ .map(createMapFunction("2", initializedOperators, closedOperators));
- inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
- .map(createMapFunction("4", initializationOrder, finalizationOrder));
+ inputStream2.map(createMapFunction("3", initializedOperators, closedOperators))
+ .map(createMapFunction("4", initializedOperators, closedOperators));
- OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+ OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockContext, SystemClock.instance());
// Assert that initialization occurs in topological order.
- implGraph.init(graph, mockConfig, mockContext);
- assertEquals(initializationOrder.get(0), "1");
- assertEquals(initializationOrder.get(1), "2");
- assertEquals(initializationOrder.get(2), "3");
- assertEquals(initializationOrder.get(3), "4");
+ assertEquals(initializedOperators.get(0), "1");
+ assertEquals(initializedOperators.get(1), "2");
+ assertEquals(initializedOperators.get(2), "3");
+ assertEquals(initializedOperators.get(3), "4");
// Assert that finalization occurs in reverse topological order.
- implGraph.close();
- assertEquals(finalizationOrder.get(0), "4");
- assertEquals(finalizationOrder.get(1), "3");
- assertEquals(finalizationOrder.get(2), "2");
- assertEquals(finalizationOrder.get(3), "1");
- }
-
- private TaskContext createMockContext() {
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- return mockContext;
+ opImplGraph.close();
+ assertEquals(closedOperators.get(0), "4");
+ assertEquals(closedOperators.get(1), "3");
+ assertEquals(closedOperators.get(2), "2");
+ assertEquals(closedOperators.get(3), "1");
}
/**
* Creates an identity map function that appends to the provided lists when init/close is invoked.
*/
- private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+ private MapFunction<Object, Object> createMapFunction(String id,
+ List<String> initializedOperators, List<String> finalizedOperators) {
return new MapFunction<Object, Object>() {
@Override
public void init(Config config, TaskContext context) {
- initializationOrder.add(id);
+ initializedOperators.add(id);
}
@Override
public void close() {
- finalizationOrder.add(id);
+ finalizedOperators.add(id);
}
@Override
@@ -108,4 +233,3 @@ public class TestOperatorImplGraph {
};
}
}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index a75fadb..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
- Field nextOperatorsField = null;
- Method createOpMethod = null;
- Method createOpsMethod = null;
-
- @Before
- public void prep() throws NoSuchFieldException, NoSuchMethodException {
- nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
- nextOperatorsField.setAccessible(true);
-
- createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
- OperatorSpec.class, Config.class, TaskContext.class);
- createOpMethod.setAccessible(true);
-
- createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
- Config.class, TaskContext.class);
- createOpsMethod.setAccessible(true);
- }
-
- @Test
- public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
- // get window operator
- WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
- when(mockWnd.getWindow()).thenReturn(windowInternal);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
-
- OperatorImplGraph opGraph = new OperatorImplGraph();
- OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
- createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
- assertTrue(opImpl instanceof WindowOperatorImpl);
- Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
- wndInternalField.setAccessible(true);
- WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
- assertEquals(wndInternal, windowInternal);
-
- // get simple operator
- StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
- when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof StreamOperatorImpl);
- Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
- txfmFnField.setAccessible(true);
- assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
- // get sink operator
- SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
- when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof SinkOperatorImpl);
- Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
- sinkFnField.setAccessible(true);
- assertEquals(sinkFn, sinkFnField.get(opImpl));
-
- // get join operator
- PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof PartialJoinOperatorImpl);
- }
-
- @Test
- public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
- // test creation of empty chain
- MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
- assertTrue(operatorChain != null);
- }
-
- @Test
- public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of linear chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl wndOpImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of broadcast chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
- testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 2);
- Iterator<OperatorImpl> iter = subsSet.iterator();
- // check the first branch w/ flatMap
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl flatMapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
- assertEquals(subsOps.size(), 0);
- // check the second branch w/ map
- opImpl = iter.next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl mapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of join chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- input1
- .join(input2,
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- }, Duration.ofMinutes(1))
- .map(m -> m);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- // now, we create chained operators from each input sources
- RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
- RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
- // check that those two chains will merge at map operator
- // first branch of the join
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
- assertEquals(subsOps.size(), 1);
- // the map operator consumes the common join output, where two branches merge
- OperatorImpl mapImpl = subsOps.iterator().next();
- // second branch of the join
- subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
- assertNotSame(joinOp1, joinOp2);
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
- assertEquals(subsOps.size(), 1);
- // make sure that the map operator is the same
- assertEquals(mapImpl, subsOps.iterator().next());
- }
-}
[4/4] samza git commit: SAMZA-1221,
SAMZA-1101: Internal cleanup for High-Level API implementation.
Posted by xi...@apache.org.
SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.
SAMZA-1221: Separated the OperatorSpec and MessageStream DAGs so that they're now duals of each other. Users interact with and construct the MessageStream DAG; we create and use the OperatorSpec DAG internally.
Moved common OperatorSpec functionality (getId, getOpCode, getOpName etc.) to the OperatorSpec abstract base class.
Added a new JoinOperatorSpec and PartialJoinOperatorImpls which are created from JoinOperatorSpec in OperatorGraphImpl.
Added a new InputOperatorSpec and InputOperatorImpl (previously RootOperatorImpl). InputOperatorSpec is created when StreamGraph#getInputStream is called.
SAMZA-1101: Added a new OutputOperatorSpec and OutputOperatorImpl for partitionBy and sendTo. These are Separate from SinkOperatorSpec for and SinkOperatorImpl for sink. We don't need to create a sinkFn for partitionBy and sendTo anymore.
Updated most unit tests to use the new classes and avoid reflection.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish V <vj...@apache.org>
Closes #194 from prateekm/internal-cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c1c4289c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c1c4289c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c1c4289c
Branch: refs/heads/master
Commit: c1c4289c8586457ae35c01f1ccb28fa59697dcb1
Parents: 29cf374
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Jun 7 12:42:11 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Jun 7 12:42:11 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../samza/execution/ExecutionPlanner.java | 67 +--
.../samza/execution/JobGraphJsonGenerator.java | 83 +--
.../org/apache/samza/execution/JobNode.java | 4 +-
.../samza/operators/MessageStreamImpl.java | 190 ++-----
.../apache/samza/operators/StreamGraphImpl.java | 88 ++--
.../org/apache/samza/operators/WindowState.java | 49 --
.../samza/operators/impl/InputOperatorImpl.java | 66 +++
.../samza/operators/impl/OperatorImpl.java | 27 +-
.../samza/operators/impl/OperatorImplGraph.java | 242 +++++----
.../operators/impl/OutputOperatorImpl.java | 72 +++
.../operators/impl/PartialJoinOperatorImpl.java | 50 +-
.../samza/operators/impl/RootOperatorImpl.java | 76 ---
.../samza/operators/impl/SinkOperatorImpl.java | 9 +-
.../operators/impl/StreamOperatorImpl.java | 9 +-
.../operators/impl/WindowOperatorImpl.java | 3 +-
.../samza/operators/impl/WindowState.java | 49 ++
.../samza/operators/spec/InputOperatorSpec.java | 52 ++
.../samza/operators/spec/JoinOperatorSpec.java | 73 +++
.../samza/operators/spec/OperatorSpec.java | 72 ++-
.../samza/operators/spec/OperatorSpecs.java | 89 ++--
.../operators/spec/OutputOperatorSpec.java | 55 ++
.../samza/operators/spec/OutputStreamImpl.java | 50 ++
.../operators/spec/PartialJoinOperatorSpec.java | 97 ----
.../samza/operators/spec/SinkOperatorSpec.java | 85 +--
.../operators/spec/StreamOperatorSpec.java | 43 +-
.../operators/spec/WindowOperatorSpec.java | 36 +-
.../operators/stream/InputStreamInternal.java | 39 --
.../stream/InputStreamInternalImpl.java | 45 --
.../stream/IntermediateMessageStreamImpl.java | 58 +++
.../stream/IntermediateStreamInternalImpl.java | 61 ---
.../operators/stream/OutputStreamInternal.java | 43 --
.../stream/OutputStreamInternalImpl.java | 52 --
.../samza/operators/util/OperatorJsonUtils.java | 89 ----
.../apache/samza/task/StreamOperatorTask.java | 42 +-
.../samza/execution/TestExecutionPlanner.java | 10 +-
.../execution/TestJobGraphJsonGenerator.java | 37 +-
.../samza/operators/TestJoinOperator.java | 6 +-
.../samza/operators/TestMessageStreamImpl.java | 522 ++++++++++---------
.../operators/TestMessageStreamImplUtil.java | 26 -
.../samza/operators/TestStreamGraphImpl.java | 243 ++++-----
.../samza/operators/data/MessageType.java | 37 --
.../data/TestExtOutputMessageEnvelope.java | 29 --
.../data/TestInputMessageEnvelope.java | 32 --
.../operators/data/TestMessageEnvelope.java | 10 +-
.../samza/operators/impl/TestOperatorImpl.java | 30 +-
.../operators/impl/TestOperatorImplGraph.java | 200 +++++--
.../samza/operators/impl/TestOperatorImpls.java | 236 ---------
.../operators/impl/TestStreamOperatorImpl.java | 11 +-
.../samza/operators/spec/TestOperatorSpecs.java | 192 -------
.../operators/spec/TestWindowOperatorSpec.java | 8 +-
.../src/main/visualizer/js/planToDagre.js | 12 +-
52 files changed, 1576 insertions(+), 2231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a8de1b5..d4dcaa1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,4 @@ docs/learn/documentation/*/rest/javadocs
out/
*.patch
**.pyc
+samza-shell/src/main/visualizer/plan.json
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index d763d84..00f4ad4 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -31,11 +31,9 @@ import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.slf4j.Logger;
@@ -77,7 +75,7 @@ public class ExecutionPlanner {
*/
/* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
JobGraph jobGraph = new JobGraph(config);
- Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
+ Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
intStreams.retainAll(sinkStreams);
@@ -120,7 +118,7 @@ public class ExecutionPlanner {
/**
* Fetch the partitions of source/sink streams and update the StreamEdges.
* @param jobGraph {@link JobGraph}
- * @param streamManager the {@StreamManager} to interface with the streams.
+ * @param streamManager the {@link StreamManager} to interface with the streams.
*/
/* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
@@ -157,20 +155,16 @@ public class ExecutionPlanner {
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
// reverse mapping of the above
Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
- // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
- // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
- // (who register first in the map wins).
- Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
// A queue of joins with known input partitions
Queue<OperatorSpec> joinQ = new LinkedList<>();
// The visited set keeps track of the join specs that have been already inserted in the queue before
Set<OperatorSpec> visited = new HashSet<>();
- streamGraph.getInputStreams().entrySet().forEach(entry -> {
+ streamGraph.getInputOperators().entrySet().forEach(entry -> {
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
- outputStreamToJoinSpec, joinQ, visited);
+ joinQ, visited);
});
// At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
@@ -209,44 +203,33 @@ public class ExecutionPlanner {
}
/**
- * This function traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
- * @param inputMessageStream next input MessageStream to traverse {@link MessageStream}
+ * This function traverses the {@link OperatorSpec} graph to find and update mappings for all Joins reachable
+ * from this input {@link StreamEdge}.
+ * @param operatorSpec the {@link OperatorSpec} to traverse
* @param sourceStreamEdge source {@link StreamEdge}
* @param joinSpecToStreamEdges mapping from join spec to its source {@link StreamEdge}s
* @param streamEdgeToJoinSpecs mapping from source {@link StreamEdge} to the join specs that consumes it
- * @param outputStreamToJoinSpec mapping from the output stream to the join spec
* @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known.
*/
- private static void findReachableJoins(MessageStream inputMessageStream, StreamEdge sourceStreamEdge,
- Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges, Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
- Map<MessageStream, OperatorSpec> outputStreamToJoinSpec, Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
- Collection<OperatorSpec> specs = ((MessageStreamImpl) inputMessageStream).getRegisteredOperatorSpecs();
- for (OperatorSpec spec : specs) {
- if (spec instanceof PartialJoinOperatorSpec) {
- // every join will have two partial join operators
- // we will choose one of them in order to consolidate the inputs
- // the first one who registered with the outputStreamToJoinSpec will win
- MessageStream output = spec.getNextStream();
- OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
- if (joinSpec == null) {
- joinSpec = spec;
- outputStreamToJoinSpec.put(output, joinSpec);
- }
-
- joinSpecToStreamEdges.put(joinSpec, sourceStreamEdge);
- streamEdgeToJoinSpecs.put(sourceStreamEdge, joinSpec);
-
- if (!visited.contains(joinSpec) && sourceStreamEdge.getPartitionCount() > 0) {
- // put the joins with known input partitions into the queue
- joinQ.add(joinSpec);
- visited.add(joinSpec);
- }
+ private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge,
+ Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
+ Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
+ Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
+ if (operatorSpec instanceof JoinOperatorSpec) {
+ joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
+ streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
+
+ if (!visited.contains(operatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
+ // put the joins with known input partitions into the queue and mark as visited
+ joinQ.add(operatorSpec);
+ visited.add(operatorSpec);
}
+ }
- if (spec.getNextStream() != null) {
- findReachableJoins(spec.getNextStream(), sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, outputStreamToJoinSpec, joinQ,
- visited);
- }
+ Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+ for (OperatorSpec registeredOpSpec : registeredOperatorSpecs) {
+ findReachableJoins(registeredOpSpec, sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ,
+ visited);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 96c0538..23c9d89 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,13 +19,9 @@
package org.apache.samza.execution;
-import com.google.common.base.Joiner;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -33,10 +29,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.util.OperatorJsonUtils;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -73,8 +70,6 @@ import org.codehaus.jackson.map.ObjectMapper;
List<StreamJson> outputStreams;
@JsonProperty("operators")
Map<Integer, Map<String, Object>> operators = new HashMap<>();
- @JsonProperty("canonicalOpIds")
- Map<Integer, String> canonicalOpIds = new HashMap<>();
}
static final class StreamJson {
@@ -108,11 +103,6 @@ import org.codehaus.jackson.map.ObjectMapper;
String applicationId;
}
- // Mapping from the output stream to the ids.
- // Logically they belong to the same operator, but in code we generate one operator for each input.
- // This is to associate the operators that output to the same MessageStream.
- Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();
-
/**
* Returns the JSON representation of a {@link JobGraph}
* @param jobGraph {@link JobGraph}
@@ -157,28 +147,21 @@ import org.codehaus.jackson.map.ObjectMapper;
}
/**
- * Traverse the {@StreamGraph} and build the operator graph JSON POJO.
+ * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
* @param jobNode job node in the {@link JobGraph}
* @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
*/
private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
OperatorGraphJson opGraph = new OperatorGraphJson();
opGraph.inputStreams = new ArrayList<>();
- jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
+ jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
StreamJson inputJson = new StreamJson();
opGraph.inputStreams.add(inputJson);
inputJson.streamId = streamSpec.getId();
- Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs();
+ Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
- updateOperatorGraphJson((MessageStreamImpl) stream, opGraph);
-
- for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) {
- List<Integer> sortedIds = new ArrayList<>(entry.getValue());
- Collections.sort(sortedIds);
- String canonicalId = Joiner.on(',').join(sortedIds);
- sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId));
- }
+ updateOperatorGraphJson(operatorSpec, opGraph);
});
opGraph.outputStreams = new ArrayList<>();
@@ -191,23 +174,43 @@ import org.codehaus.jackson.map.ObjectMapper;
}
/**
- * Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
- * @param messageStream input
+ * Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO.
+ * @param operatorSpec input
* @param opGraph operator graph to build
*/
- private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) {
- Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
- specs.forEach(opSpec -> {
- opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec));
-
- if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
- outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId());
- }
-
- if (opSpec.getNextStream() != null) {
- updateOperatorGraphJson(opSpec.getNextStream(), opGraph);
- }
- });
+ private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) {
+ // TODO xiliu: render input operators instead of input streams
+ if (operatorSpec.getOpCode() != OpCode.INPUT) {
+ opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec));
+ }
+ Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
+ specs.forEach(opSpec -> updateOperatorGraphJson(opSpec, opGraph));
+ }
+
+ /**
+ * Format the operator properties into a map
+ * @param spec a {@link OperatorSpec} instance
+ * @return map of the operator properties
+ */
+ private Map<String, Object> operatorToMap(OperatorSpec spec) {
+ Map<String, Object> map = new HashMap<>();
+ map.put("opCode", spec.getOpCode().name());
+ map.put("opId", spec.getOpId());
+ map.put("sourceLocation", spec.getSourceLocation());
+
+ Collection<OperatorSpec> nextOperators = spec.getRegisteredOperatorSpecs();
+ map.put("nextOperatorIds", nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
+
+ if (spec instanceof OutputOperatorSpec) {
+ OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream();
+ map.put("outputStreamId", outputStream.getStreamSpec().getId());
+ }
+
+ if (spec instanceof JoinOperatorSpec) {
+ map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
+ }
+
+ return map;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index fbad520..c42e1cc 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -31,8 +31,8 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.util.Util;
@@ -145,7 +145,7 @@ public class JobNode {
// Filter out the join operators, and obtain a list of their ttl values
List<Long> joinTtlIntervals = operatorSpecs.stream()
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
- .map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs())
+ .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
.collect(Collectors.toList());
// Combine both the above lists
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 9912f95..db6fd5a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,37 +19,35 @@
package org.apache.samza.operators;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
import java.util.function.Function;
/**
- * The implementation for input/output {@link MessageStream}s to/from the operators.
- * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
+ * The {@link MessageStream} implementation that lets users describe their logical DAG.
+ * Users can obtain an instance by calling {@link StreamGraph#getInputStream}.
+ * <p>
+ * Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows
+ * users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl}
+ * presents an "edge-centric" (streams) view of the "node-centric" (specs) logical DAG for the users.
*
* @param <M> type of messages in this {@link MessageStream}
*/
@@ -60,171 +58,97 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
private final StreamGraphImpl graph;
/**
- * The set of operators that consume the messages in this {@link MessageStream}
- *
- * Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
+ * The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
*/
- private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>();
+ private final OperatorSpec operatorSpec;
- /**
- * Default constructor
- *
- * @param graph the {@link StreamGraphImpl} object that this stream belongs to
- */
- public MessageStreamImpl(StreamGraphImpl graph) {
+ public MessageStreamImpl(StreamGraphImpl graph, OperatorSpec<?, M> operatorSpec) {
this.graph = graph;
+ this.operatorSpec = operatorSpec;
}
@Override
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
- OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec(
- mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
+ OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
- OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, new MessageStreamImpl<>(this.graph),
- this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
+ OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
- OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, new MessageStreamImpl<>(this.graph),
- this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
+ OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
public void sink(SinkFunction<? super M> sinkFn) {
SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(op);
+ this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
- SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal<K, V, M>) outputStream,
- this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(op);
+ OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+ (OutputStreamImpl<K, V, M>) outputStream, this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
- OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
- new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
- this.registeredOperatorSpecs.add(wndOp);
- return wndOp.getNextStream();
+ OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
+ (WindowInternal<M, K, WV>) window, this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.graph, op);
}
@Override
- public <K, OM, TM> MessageStream<TM> join(
- MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl) {
- MessageStreamImpl<TM> nextStream = new MessageStreamImpl<>(this.graph);
-
- PartialJoinFunction<K, M, OM, TM> thisPartialJoinFn = new PartialJoinFunction<K, M, OM, TM>() {
- private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> thisStreamState;
-
- @Override
- public TM apply(M m, OM jm) {
- return joinFn.apply(m, jm);
- }
-
- @Override
- public K getKey(M message) {
- return joinFn.getFirstKey(message);
- }
-
- @Override
- public KeyValueStore<K, PartialJoinMessage<M>> getState() {
- return thisStreamState;
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- // joinFn#init() must only be called once, so we do it in this partial join function's #init.
- joinFn.init(config, context);
-
- thisStreamState = new InternalInMemoryStore<>();
- }
-
- @Override
- public void close() {
- // joinFn#close() must only be called once, so we do it in this partial join function's #close.
- joinFn.close();
- }
- };
-
- PartialJoinFunction<K, OM, M, TM> otherPartialJoinFn = new PartialJoinFunction<K, OM, M, TM>() {
- private KeyValueStore<K, PartialJoinMessage<OM>> otherStreamState;
-
- @Override
- public TM apply(OM om, M m) {
- return joinFn.apply(m, om);
- }
-
- @Override
- public K getKey(OM message) {
- return joinFn.getSecondKey(message);
- }
-
- @Override
- public KeyValueStore<K, PartialJoinMessage<OM>> getState() {
- return otherStreamState;
- }
-
- @Override
- public void init(Config config, TaskContext taskContext) {
- otherStreamState = new InternalInMemoryStore<>();
- }
- };
-
- this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
- thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
-
- ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs
- .createPartialJoinOperatorSpec(otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream,
- this.graph.getNextOpId()));
-
- return nextStream;
+ public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
+ JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) {
+ OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec();
+ JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
+ OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
+ (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId());
+
+ this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
+ otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
+
+ return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
@Override
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
- MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
- List<MessageStream<M>> streamsToMerge = new ArrayList<>((Collection<MessageStream<M>>) otherStreams);
- streamsToMerge.add(this);
-
- streamsToMerge.forEach(stream -> {
- OperatorSpec mergeOperatorSpec =
- OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
- ((MessageStreamImpl<M>) stream).registeredOperatorSpecs.add(mergeOperatorSpec);
- });
- return nextStream;
+ StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId());
+ this.operatorSpec.registerNextOperatorSpec(opSpec);
+ otherStreams.forEach(other ->
+ ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
+ return new MessageStreamImpl<>(this.graph, opSpec);
}
@Override
public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) {
int opId = this.graph.getNextOpId();
String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
- MessageStreamImpl<M> intermediateStream =
- this.graph.<K, M, M>getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
- SinkOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
- (OutputStreamInternal<K, M, M>) intermediateStream, opId);
- this.registeredOperatorSpecs.add(partitionByOperatorSpec);
+ IntermediateMessageStreamImpl<K, M, M> intermediateStream =
+ this.graph.getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
+ OutputOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+ intermediateStream.getOutputStream(), opId);
+ this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
return intermediateStream;
}
/**
- * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
- * should not be exposed to users.
- *
- * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}.
+ * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+ * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
*/
- public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
- return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+ protected OperatorSpec<?, M> getOperatorSpec() {
+ return this.operatorSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index fcce5eb..c0da1b2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -20,12 +20,10 @@ package org.apache.samza.operators;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
@@ -51,9 +49,9 @@ public class StreamGraphImpl implements StreamGraph {
*/
private int opId = 0;
- // Using LHM for deterministic order in initializing and closing operators.
- private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>();
- private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>();
+ // We use a LHM for deterministic order in initializing and closing operators.
+ private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+ private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
private final ApplicationRunner runner;
private final Config config;
@@ -67,17 +65,21 @@ public class StreamGraphImpl implements StreamGraph {
}
@Override
- public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+ public <K, V, M> MessageStream<M> getInputStream(String streamId,
+ BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
if (msgBuilder == null) {
throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
}
- if (inStreams.containsKey(runner.getStreamSpec(streamId))) {
- throw new IllegalStateException("Cannot invoke getInputStream() multiple times with the same streamId: " + streamId);
+ if (inputOperators.containsKey(runner.getStreamSpec(streamId))) {
+ throw new IllegalStateException("getInputStream() invoked multiple times "
+ + "with the same streamId: " + streamId);
}
- return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
- streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, (BiFunction<K, V, M>) msgBuilder));
+ StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ inputOperators.put(streamSpec,
+ new InputOperatorSpec<>(streamSpec, (BiFunction<K, V, M>) msgBuilder, this.getNextOpId()));
+ return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
@Override
@@ -91,12 +93,15 @@ public class StreamGraphImpl implements StreamGraph {
throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
}
- if (outStreams.containsKey(runner.getStreamSpec(streamId))) {
- throw new IllegalStateException("Cannot invoke getOutputStream() multiple times with the same streamId: " + streamId);
+ if (outputStreams.containsKey(runner.getStreamSpec(streamId))) {
+ throw new IllegalStateException("getOutputStream() invoked multiple times "
+ + "with the same streamId: " + streamId);
}
- return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
- streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+ StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ outputStreams.put(streamSpec,
+ new OutputStreamImpl<>(streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+ return outputStreams.get(streamSpec);
}
@Override
@@ -120,8 +125,9 @@ public class StreamGraphImpl implements StreamGraph {
* @param <M> the type of messages in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStreamImpl}
*/
- <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName,
- Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+ <K, V, M> IntermediateMessageStreamImpl<K, V, M> getIntermediateStream(String streamName,
+ Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor,
+ BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
String streamId = String.format("%s-%s-%s",
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
@@ -129,30 +135,28 @@ public class StreamGraphImpl implements StreamGraph {
if (msgBuilder == null) {
throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
}
-
if (keyExtractor == null) {
throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
}
if (msgExtractor == null) {
throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
}
-
StreamSpec streamSpec = runner.getStreamSpec(streamId);
- IntermediateStreamInternalImpl<K, V, M> intStream =
- (IntermediateStreamInternalImpl<K, V, M>) inStreams
- .computeIfAbsent(streamSpec,
- k -> new IntermediateStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor,
- (Function<M, V>) msgExtractor, (BiFunction<K, V, M>) msgBuilder));
- outStreams.putIfAbsent(streamSpec, intStream);
- return intStream;
+ if (inputOperators.containsKey(streamSpec) || outputStreams.containsKey(streamSpec)) {
+ throw new IllegalStateException("getIntermediateStream() invoked multiple times "
+ + "with the same streamId: " + streamId);
+ }
+ inputOperators.put(streamSpec, new InputOperatorSpec(streamSpec, msgBuilder, this.getNextOpId()));
+ outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, keyExtractor, msgExtractor));
+ return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
}
- public Map<StreamSpec, InputStreamInternal> getInputStreams() {
- return Collections.unmodifiableMap(inStreams);
+ public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+ return Collections.unmodifiableMap(inputOperators);
}
- public Map<StreamSpec, OutputStreamInternal> getOutputStreams() {
- return Collections.unmodifiableMap(outStreams);
+ public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+ return Collections.unmodifiableMap(outputStreams);
}
public ContextManager getContextManager() {
@@ -169,24 +173,20 @@ public class StreamGraphImpl implements StreamGraph {
* @return a set of all available {@link OperatorSpec}s
*/
public Collection<OperatorSpec> getAllOperatorSpecs() {
- Collection<InputStreamInternal> inputStreams = inStreams.values();
+ Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values();
Set<OperatorSpec> operatorSpecs = new HashSet<>();
- for (InputStreamInternal stream : inputStreams) {
- doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs);
+ for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) {
+ doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
}
return operatorSpecs;
}
- private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> specs) {
- Collection<OperatorSpec> registeredOperatorSpecs = stream.getRegisteredOperatorSpecs();
- for (OperatorSpec spec : registeredOperatorSpecs) {
- specs.add(spec);
- MessageStreamImpl nextStream = spec.getNextStream();
- if (nextStream != null) {
- //Recursively traverse and obtain all reachable operators
- doGetOperatorSpecs(nextStream, specs);
- }
+ private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) {
+ Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+ for (OperatorSpec registeredOperatorSpec: registeredOperatorSpecs) {
+ specs.add(registeredOperatorSpec);
+ doGetOperatorSpecs(registeredOperatorSpec, specs);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
deleted file mode 100644
index 801044b..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-/**
- * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
- */
-public class WindowState<WV> {
-
- final WV wv;
- /**
- * Time of the first message in the window
- */
- final long earliestRecvTime;
-
- public WindowState(WV wv, long earliestRecvTime) {
- this.wv = wv;
- this.earliestRecvTime = earliestRecvTime;
- }
-
- public WV getWindowValue() {
- return wv;
- }
-
- public long getEarliestTimestamp() {
- return earliestRecvTime;
- }
-
- @Override
- public String toString() {
- return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
new file mode 100644
index 0000000..0545af1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+/**
+ * An operator that builds the input message from the incoming message.
+ *
+ * @param <K> the type of key in the incoming message
+ * @param <V> the type of message in the incoming message
+ * @param <M> the type of input message
+ */
+public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M> {
+
+ private final InputOperatorSpec<K, V, M> inputOpSpec;
+
+ InputOperatorImpl(InputOperatorSpec<K, V, M> inputOpSpec) {
+ this.inputOpSpec = inputOpSpec;
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {
+ }
+
+ @Override
+ public Collection<M> handleMessage(Pair<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
+ // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
+ M message = this.inputOpSpec.getMsgBuilder().apply(pair.getKey(), pair.getValue());
+ return Collections.singletonList(message);
+ }
+
+ @Override
+ protected void handleClose() {
+ }
+
+ protected OperatorSpec<Pair<K, V>, M> getOperatorSpec() {
+ return this.inputOpSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 23c31ac..73bb83d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -43,12 +43,13 @@ public abstract class OperatorImpl<M, RM> {
private boolean initialized;
private boolean closed;
- private Set<OperatorImpl<RM, ?>> registeredOperators;
private HighResolutionClock highResClock;
private Counter numMessage;
private Timer handleMessageNs;
private Timer handleTimerNs;
+ Set<OperatorImpl<RM, ?>> registeredOperators;
+
/**
* Initialize this {@link OperatorImpl} and its user-defined functions.
*
@@ -56,7 +57,7 @@ public abstract class OperatorImpl<M, RM> {
* @param context the {@link TaskContext} for the task
*/
public final void init(Config config, TaskContext context) {
- String opName = getOperatorSpec().getOpName();
+ String opName = getOperatorName();
if (initialized) {
throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
@@ -95,7 +96,7 @@ public abstract class OperatorImpl<M, RM> {
if (!initialized) {
throw new IllegalStateException(
String.format("Attempted to register next operator before initializing operator %s.",
- getOperatorSpec().getOpName()));
+ getOperatorName()));
}
this.registeredOperators.add(nextOperator);
}
@@ -167,10 +168,9 @@ public abstract class OperatorImpl<M, RM> {
}
public void close() {
- String opName = getOperatorSpec().getOpName();
-
if (closed) {
- throw new IllegalStateException(String.format("Attempted to close Operator %s more than once.", opName));
+ throw new IllegalStateException(
+ String.format("Attempted to close Operator %s more than once.", getOperatorSpec().getOpName()));
}
handleClose();
closed = true;
@@ -183,7 +183,20 @@ public abstract class OperatorImpl<M, RM> {
*
* @return the {@link OperatorSpec} for this {@link OperatorImpl}
*/
- protected abstract OperatorSpec<RM> getOperatorSpec();
+ protected abstract OperatorSpec<M, RM> getOperatorSpec();
+
+ /**
+ * Get the name for this {@link OperatorImpl}.
+ *
+ * Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are
+ * 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an
+ * implementation specific name, e.g., for use in metrics.
+ *
+ * @return the operator name
+ */
+ protected String getOperatorName() {
+ return getOperatorSpec().getOpName();
+ }
private HighResolutionClock createHighResClock(Config config) {
if (new MetricsConfig(config).getMetricsTimerEnabled()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index e99b3ee..e5fce13 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -19,18 +19,23 @@
package org.apache.samza.operators.impl;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Clock;
-import org.apache.samza.util.SystemClock;
import java.util.ArrayList;
import java.util.Collection;
@@ -42,67 +47,59 @@ import java.util.Map;
/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for the input
- * {@link MessageStreamImpl}s.
+ * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
*/
public class OperatorImplGraph {
/**
- * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
- * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
- * input {@link MessageStreamImpl}s.
- *
- * Using LHM for deterministic ordering in initializing and closing operators.
+ * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
+ * multiple {@link OperatorImpl}s for an {@link OperatorSpec} when it's reached from different
+ * {@link OperatorSpec}s during DAG traversals (e.g., for the merge operator).
+ * We use a LHM for deterministic ordering in initializing and closing operators.
*/
- private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>();
+ private final Map<String, OperatorImpl> operatorImpls = new LinkedHashMap<>();
/**
- * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
+ * A mapping from input {@link SystemStream}s to their {@link InputOperatorImpl} sub-DAG in this graph.
*/
- private final Map<SystemStream, RootOperatorImpl> rootOperators = new HashMap<>();
+ private final Map<SystemStream, InputOperatorImpl> inputOperators = new HashMap<>();
- private final Clock clock;
-
- public OperatorImplGraph(Clock clock) {
- this.clock = clock;
- }
+ /**
+ * A mapping from {@link JoinOperatorSpec}s to their two {@link PartialJoinFunction}s. Used to associate
+ * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
+ * reached from different {@link OperatorSpec} during DAG traversals.
+ */
+ private final Map<Integer, Pair<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
- /* package private */ OperatorImplGraph() {
- this(SystemClock.instance());
- }
+ private final Clock clock;
/**
- * Initialize the DAG of {@link OperatorImpl}s for the input {@link MessageStreamImpl} in the provided
- * {@link StreamGraphImpl}.
+ * Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG of {@link OperatorSpec}s
+ * in the {@code streamGraph}.
*
- * @param streamGraph the logical {@link StreamGraphImpl}
+ * @param streamGraph the {@link StreamGraphImpl} containing the logical {@link OperatorSpec} DAG
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
+ * @param clock the {@link Clock} to get current time
*/
- public void init(StreamGraphImpl streamGraph, Config config, TaskContext context) {
- streamGraph.getInputStreams().forEach((streamSpec, inputStream) -> {
+ public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) {
+ this.clock = clock;
+ streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
- this.rootOperators.put(systemStream, this.createOperatorImpls((MessageStreamImpl) inputStream, config, context));
+ InputOperatorImpl inputOperatorImpl =
+ (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context);
+ this.inputOperators.put(systemStream, inputOperatorImpl);
});
}
/**
- * Get the {@link RootOperatorImpl} corresponding to the provided input {@code systemStream}.
+ * Get the {@link InputOperatorImpl} corresponding to the provided input {@code systemStream}.
*
* @param systemStream input {@link SystemStream}
- * @return the {@link RootOperatorImpl} that starts processing the input message
+ * @return the {@link InputOperatorImpl} that starts processing the input message
*/
- public RootOperatorImpl getRootOperator(SystemStream systemStream) {
- return this.rootOperators.get(systemStream);
- }
-
- /**
- * Get all {@link RootOperatorImpl}s for the graph.
- *
- * @return an unmodifiable view of all {@link RootOperatorImpl}s for the graph
- */
- public Collection<RootOperatorImpl> getAllRootOperators() {
- return Collections.unmodifiableCollection(this.rootOperators.values());
+ public InputOperatorImpl getInputOperator(SystemStream systemStream) {
+ return this.inputOperators.get(systemStream);
}
public void close() {
@@ -112,65 +109,44 @@ public class OperatorImplGraph {
}
/**
- * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
- * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+ * Get all {@link InputOperatorImpl}s for the graph.
*
- * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
- * @param config the {@link Config} required to instantiate operators
- * @param context the {@link TaskContext} required to instantiate operators
- * @param <M> the type of messages in the {@code source} {@link MessageStreamImpl}
- * @return root node for the {@link OperatorImpl} DAG
+ * @return an unmodifiable view of all {@link InputOperatorImpl}s for the graph
*/
- private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
- Config config, TaskContext context) {
- // since the source message stream might have multiple operator specs registered on it,
- // create a new root node as a single point of entry for the DAG.
- RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
- rootOperator.init(config, context);
- // create the pipeline/topology starting from the source
- source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
- // pass in the context so that operator implementations can initialize their functions
- OperatorImpl<M, ?> operatorImpl =
- createAndRegisterOperatorImpl(registeredOperator, config, context);
- rootOperator.registerNextOperator(operatorImpl);
- });
- return rootOperator;
+ public Collection<InputOperatorImpl> getAllInputOperators() {
+ return Collections.unmodifiableCollection(this.inputOperators.values());
}
/**
- * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
- * {@link OperatorImpl}s.
+ * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link OperatorSpec},
+ * creates the corresponding DAG of {@link OperatorImpl}s, and returns the root {@link OperatorImpl} node.
*
+ * @param prevOperatorSpec the parent of the current {@code operatorSpec} in the traversal
* @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
- * @param <M> type of input message
* @return the operator implementation for the operatorSpec
*/
- private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+ OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
Config config, TaskContext context) {
- if (!operatorImpls.containsKey(operatorSpec)) {
- OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context);
- if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
- // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
- // so traverse and initialize and register the rest of the DAG.
- // initialize the corresponding operator function
- operatorImpl.init(config, context);
- MessageStreamImpl nextStream = operatorSpec.getNextStream();
- if (nextStream != null) {
- Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
- registeredSpecs.forEach(registeredSpec -> {
- OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context);
- operatorImpl.registerNextOperator(subImpl);
- });
- }
- return operatorImpl;
- }
+ if (!operatorImpls.containsKey(operatorSpec) || operatorSpec instanceof JoinOperatorSpec) {
+ // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
+ // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
+ OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
+ operatorImpl.init(config, context);
+ operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
+
+ Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+ registeredSpecs.forEach(registeredSpec -> {
+ OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context);
+ operatorImpl.registerNextOperator(nextImpl);
+ });
+ return operatorImpl;
+ } else {
+ // the implementation corresponding to operatorSpec has already been instantiated
+ // and registered, so we do not need to traverse the DAG further.
+ return operatorImpls.get(operatorSpec);
}
-
- // the implementation corresponding to operatorSpec has already been instantiated
- // and registered, so we do not need to traverse the DAG further.
- return operatorImpls.get(operatorSpec);
}
/**
@@ -179,20 +155,96 @@ public class OperatorImplGraph {
* @param operatorSpec the immutable {@link OperatorSpec} definition.
* @param config the {@link Config} required to instantiate operators
* @param context the {@link TaskContext} required to instantiate operators
- * @param <M> type of input message
* @return the {@link OperatorImpl} implementation instance
*/
- private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) {
- if (operatorSpec instanceof StreamOperatorSpec) {
- return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context);
+ OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
+ Config config, TaskContext context) {
+ if (operatorSpec instanceof InputOperatorSpec) {
+ return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
+ } else if (operatorSpec instanceof StreamOperatorSpec) {
+ return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec, config, context);
} else if (operatorSpec instanceof SinkOperatorSpec) {
- return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+ return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context);
+ } else if (operatorSpec instanceof OutputOperatorSpec) {
+ return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, context);
} else if (operatorSpec instanceof WindowOperatorSpec) {
- return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
- } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
- return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock);
+ return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
+ } else if (operatorSpec instanceof JoinOperatorSpec) {
+ return createPartialJoinOperatorImpl(prevOperatorSpec, (JoinOperatorSpec) operatorSpec, config, context, clock);
}
throw new IllegalArgumentException(
String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
}
+
+ private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec,
+ JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) {
+ Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
+
+ if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join
+ return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
+ partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock);
+ } else { // we got here from the right side of the join
+ return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false,
+ partialJoinFunctions.getRight(), partialJoinFunctions.getLeft(), config, context, clock);
+ }
+ }
+
+ private Pair<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
+ return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
+ joinOpId -> Pair.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+ }
+
+ private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {
+ return new PartialJoinFunction<Object, Object, Object, Object>() {
+ private KeyValueStore<Object, PartialJoinMessage<Object>> leftStreamState = new InternalInMemoryStore<>();
+
+ @Override
+ public Object apply(Object m, Object jm) {
+ return joinFn.apply(m, jm);
+ }
+
+ @Override
+ public Object getKey(Object message) {
+ return joinFn.getFirstKey(message);
+ }
+
+ @Override
+ public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+ return leftStreamState;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ // user-defined joinFn should only be initialized once, so we do it only in left partial join function.
+ joinFn.init(config, context);
+ }
+
+ @Override
+ public void close() {
+ // joinFn#close() must only be called once, so we do it it only in left partial join function.
+ joinFn.close();
+ }
+ };
+ }
+
+ private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinFunction joinFn) {
+ return new PartialJoinFunction<Object, Object, Object, Object>() {
+ private KeyValueStore<Object, PartialJoinMessage<Object>> rightStreamState = new InternalInMemoryStore<>();
+
+ @Override
+ public Object apply(Object m, Object jm) {
+ return joinFn.apply(jm, m);
+ }
+
+ @Override
+ public Object getKey(Object message) {
+ return joinFn.getSecondKey(message);
+ }
+
+ @Override
+ public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+ return rightStreamState;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
new file mode 100644
index 0000000..fe59b74
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+/**
+ * An operator that sends incoming messages to an output {@link SystemStream}.
+ */
+class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
+
+ private final OutputOperatorSpec<M> outputOpSpec;
+ private final OutputStreamImpl<?, ?, M> outputStream;
+
+ OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
+ this.outputOpSpec = outputOpSpec;
+ this.outputStream = outputOpSpec.getOutputStream();
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {
+ }
+
+ @Override
+ public Collection<Void> handleMessage(M message, MessageCollector collector,
+ TaskCoordinator coordinator) {
+ // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
+ SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+ outputStream.getStreamSpec().getPhysicalName());
+ Object key = outputStream.getKeyExtractor().apply(message);
+ Object msg = outputStream.getMsgExtractor().apply(message);
+ collector.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected void handleClose() {
+ }
+
+ @Override
+ protected OperatorSpec<M, Void> getOperatorSpec() {
+ return outputOpSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index b00a2e9..ad66962 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -22,8 +22,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -31,8 +31,6 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -40,18 +38,18 @@ import java.util.Collections;
import java.util.List;
/**
- * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
- * with buffered messages of type {@code JM} in the other stream.
+ * Implementation of one side of a {@link JoinOperatorSpec} that buffers and joins its input messages of
+ * type {@code M} with buffered input messages of type {@code JM} in the paired {@link PartialJoinOperatorImpl}.
*
- * @param <M> type of messages in the input stream
- * @param <JM> type of messages in the stream to join with
- * @param <RM> type of messages in the joined stream
+ * @param <K> the type of join key
+ * @param <M> the type of input messages on this side of the join
+ * @param <JM> the type of input message on the other side of the join
+ * @param <RM> the type of join result
*/
class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
-
- private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
+ private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec;
+ private final boolean isLeftSide; // whether this operator impl is for the left side of the join
private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
private final long ttlMs;
@@ -59,19 +57,22 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
private Counter keysRemoved;
- PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec,
+ PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean isLeftSide,
+ PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
+ PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
Config config, TaskContext context, Clock clock) {
- this.partialJoinOpSpec = partialJoinOpSpec;
- this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn();
- this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn();
- this.ttlMs = partialJoinOpSpec.getTtlMs();
+ this.joinOpSpec = joinOpSpec;
+ this.isLeftSide = isLeftSide;
+ this.thisPartialJoinFn = thisPartialJoinFn;
+ this.otherPartialJoinFn = otherPartialJoinFn;
+ this.ttlMs = joinOpSpec.getTtlMs();
this.clock = clock;
}
@Override
protected void handleInit(Config config, TaskContext context) {
keysRemoved = context.getMetricsRegistry()
- .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
+ .newCounter(OperatorImpl.class.getName(), getOperatorName() + "-keys-removed");
this.thisPartialJoinFn.init(config, context);
}
@@ -116,8 +117,19 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
this.thisPartialJoinFn.close();
}
+ protected OperatorSpec<M, RM> getOperatorSpec() {
+ return (OperatorSpec<M, RM>) joinOpSpec;
+ }
+
+ /**
+ * The name for this {@link PartialJoinOperatorImpl} that includes information about which
+ * side of the join it is for.
+ *
+ * @return the {@link PartialJoinOperatorImpl} name.
+ */
@Override
- protected OperatorSpec<RM> getOperatorSpec() {
- return partialJoinOpSpec;
+ protected String getOperatorName() {
+ String side = isLeftSide ? "L" : "R";
+ return this.joinOpSpec.getOpName() + "-" + side;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
deleted file mode 100644
index 45cb941..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.Collections;
-
-
-/**
- * A no-op operator implementation that forwards incoming messages to all of its subscribers.
- * @param <M> type of incoming messages
- */
-public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
-
- @Override
- protected void handleInit(Config config, TaskContext context) {
- }
-
- @Override
- public Collection<M> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
- return Collections.singletonList(message);
- }
-
- @Override
- protected void handleClose() {
- }
-
- // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message
- @Override
- protected OperatorSpec<M> getOperatorSpec() {
- return new OperatorSpec<M>() {
- @Override
- public MessageStreamImpl<M> getNextStream() {
- return null;
- }
-
- @Override
- public OpCode getOpCode() {
- return OpCode.INPUT;
- }
-
- @Override
- public int getOpId() {
- return -1;
- }
-
- @Override
- public String getSourceLocation() {
- return "";
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 4f698f8..5dbe27f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -31,9 +31,9 @@ import java.util.Collections;
/**
- * Implementation for {@link SinkOperatorSpec}
+ * An operator that sends incoming messages to an arbitrary output system using the provided {@link SinkFunction}.
*/
-class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
+class SinkOperatorImpl<M> extends OperatorImpl<M, Void> {
private final SinkOperatorSpec<M> sinkOpSpec;
private final SinkFunction<M> sinkFn;
@@ -49,7 +49,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
}
@Override
- public Collection<M> handleMessage(M message, MessageCollector collector,
+ public Collection<Void> handleMessage(M message, MessageCollector collector,
TaskCoordinator coordinator) {
this.sinkFn.apply(message, collector, coordinator);
// there should be no further chained operators since this is a terminal operator.
@@ -61,8 +61,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
this.sinkFn.close();
}
- @Override
- protected OperatorSpec<M> getOperatorSpec() {
+ protected OperatorSpec<M, Void> getOperatorSpec() {
return sinkOpSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index e720803..a51d5e6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -30,10 +30,10 @@ import java.util.Collection;
/**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
+ * A simple operator that accepts a 1:n transform function and applies it to each incoming message.
*
- * @param <M> type of message in the input stream
- * @param <RM> type of message in the output stream
+ * @param <M> the type of input message
+ * @param <RM> the type of result
*/
class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
@@ -62,8 +62,7 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
this.transformFn.close();
}
- @Override
- protected OperatorSpec<RM> getOperatorSpec() {
+ protected OperatorSpec<M, RM> getOperatorSpec() {
return streamOpSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index b258042..f9485f7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -21,7 +21,6 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.WindowState;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.triggers.FiringType;
@@ -154,7 +153,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
}
@Override
- protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() {
+ protected OperatorSpec<M, WindowPane<WK, WV>> getOperatorSpec() {
return windowOpSpec;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
new file mode 100644
index 0000000..4577a5c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+/**
+ * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
+ */
+class WindowState<WV> {
+
+ private final WV wv;
+ /**
+ * Time of the first message in the window
+ */
+ private final long earliestRecvTime;
+
+ WindowState(WV wv, long earliestRecvTime) {
+ this.wv = wv;
+ this.earliestRecvTime = earliestRecvTime;
+ }
+
+ WV getWindowValue() {
+ return wv;
+ }
+
+ long getEarliestTimestamp() {
+ return earliestRecvTime;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
new file mode 100644
index 0000000..6fbc3c1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+/**
+ * The spec for an operator that receives incoming messages from an input stream
+ * and converts them to the input message.
+ *
+ * @param <K> the type of key in the incoming message
+ * @param <V> the type of message in the incoming message
+ * @param <M> the type of input message
+ */
+public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
+
+ private final StreamSpec streamSpec;
+ private final BiFunction<K, V, M> msgBuilder;
+
+ public InputOperatorSpec(StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder, int opId) {
+ super(OpCode.INPUT, opId);
+ this.streamSpec = streamSpec;
+ this.msgBuilder = msgBuilder;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return this.streamSpec;
+ }
+
+ public BiFunction<K, V, M> getMsgBuilder() {
+ return this.msgBuilder;
+ }
+}
[3/4] samza git commit: SAMZA-1221,
SAMZA-1101: Internal cleanup for High-Level API implementation.
Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
new file mode 100644
index 0000000..16f59d7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.functions.JoinFunction;
+
+
+/**
+ * The spec for the join operator that buffers messages from one stream and
+ * joins them with buffered messages from another stream.
+ *
+ * @param <K> the type of join key
+ * @param <M> the type of message in this stream
+ * @param <JM> the type of message in the other stream
+ * @param <RM> the type of join result
+ */
+public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { // Object == M | JM
+
+ private final OperatorSpec<?, M> leftInputOpSpec;
+ private final OperatorSpec<?, JM> rightInputOpSpec;
+ private final JoinFunction<K, M, JM, RM> joinFn;
+ private final long ttlMs;
+
+ /**
+ * Default constructor for a {@link JoinOperatorSpec}.
+ *
+ * @param leftInputOpSpec the operator spec for the stream on the left side of the join
+ * @param rightInputOpSpec the operator spec for the stream on the right side of the join
+ * @param joinFn the user-defined join function to get join keys and results
+ * @param ttlMs the ttl in ms for retaining messages in each stream
+ * @param opId the unique ID for this operator
+ */
+ JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
+ JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+ super(OpCode.JOIN, opId);
+ this.leftInputOpSpec = leftInputOpSpec;
+ this.rightInputOpSpec = rightInputOpSpec;
+ this.joinFn = joinFn;
+ this.ttlMs = ttlMs;
+ }
+
+ public OperatorSpec getLeftInputOpSpec() {
+ return leftInputOpSpec;
+ }
+
+ public OperatorSpec getRightInputOpSpec() {
+ return rightInputOpSpec;
+ }
+
+ public JoinFunction<K, M, JM, RM> getJoinFn() {
+ return this.joinFn;
+ }
+
+ public long getTtlMs() {
+ return ttlMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 3ea52ca..f64e123 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,19 +19,23 @@
package org.apache.samza.operators.spec;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.MessageStreamImpl;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
/**
* A stream operator specification that holds all the information required to transform
- * the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce the output
+ * {@link org.apache.samza.operators.MessageStreamImpl}.
*
+ * @param <M> the type of input message to the operator
* @param <OM> the type of output message from the operator
*/
@InterfaceStability.Unstable
-public interface OperatorSpec<OM> {
+public abstract class OperatorSpec<M, OM> {
- enum OpCode {
+ public enum OpCode {
INPUT,
MAP,
FLAT_MAP,
@@ -41,39 +45,77 @@ public interface OperatorSpec<OM> {
JOIN,
WINDOW,
MERGE,
- PARTITION_BY
+ PARTITION_BY,
+ OUTPUT
}
+ private final int opId;
+ private final OpCode opCode;
+ private StackTraceElement[] creationStackTrace;
+
/**
- * Get the next {@link MessageStreamImpl} that receives the transformed messages produced by this operator.
- * @return the next {@link MessageStreamImpl}
+ * The set of operators that consume the messages produced from this operator.
+ * <p>
+ * We use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
*/
- MessageStreamImpl<OM> getNextStream();
+ private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>();
+
+ public OperatorSpec(OpCode opCode, int opId) {
+ this.opCode = opCode;
+ this.opId = opId;
+ this.creationStackTrace = Thread.currentThread().getStackTrace();
+ }
+
+ /**
+ * Register the next operator spec in the chain that this operator should propagate its output to.
+ * @param nextOperatorSpec the next operator in the chain.
+ */
+ public void registerNextOperatorSpec(OperatorSpec<OM, ?> nextOperatorSpec) {
+ nextOperatorSpecs.add(nextOperatorSpec);
+ }
+
+ public Collection<OperatorSpec<OM, ?>> getRegisteredOperatorSpecs() {
+ return nextOperatorSpecs;
+ }
/**
* Get the {@link OpCode} for this operator.
* @return the {@link OpCode} for this operator
*/
- OpCode getOpCode();
+ public final OpCode getOpCode() {
+ return this.opCode;
+ }
/**
* Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
* @return the unique operator ID
*/
- int getOpId();
+ public final int getOpId() {
+ return this.opId;
+ }
/**
- * Return the user source code location that creates the operator
- * @return source location
+ * Get the user source code location that created the operator.
+ * @return source code location for the operator
*/
- String getSourceLocation();
+ public final String getSourceLocation() {
+ // The stack trace for most operators looks like:
+ // [0] Thread.getStackTrace()
+ // [1] OperatorSpec.init<>()
+ // [2] SomeOperatorSpec.<init>()
+ // [3] OperatorSpecs.createSomeOperatorSpec()
+ // [4] MessageStreamImpl.someOperator()
+ // [5] User code that calls [4]
+ // we are interested in [5] here
+ StackTraceElement element = this.creationStackTrace[5];
+ return String.format("%s:%s", element.getFileName(), element.getLineNumber());
+ }
/**
* Get the name for this operator based on its opCode and opId.
* @return the name for this operator
*/
- default String getOpName() {
+ public final String getOpName() {
return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 66e2c58..ed5fc8f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -20,14 +20,11 @@
package org.apache.samza.operators.spec;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.task.TaskContext;
@@ -46,14 +43,13 @@ public class OperatorSpecs {
* Creates a {@link StreamOperatorSpec} for {@link MapFunction}
*
* @param mapFn the map function
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <M> type of input message
* @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
- MapFunction<? super M, ? extends OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) {
+ MapFunction<? super M, ? extends OM> mapFn, int opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
@Override
public Collection<OM> apply(M message) {
@@ -76,20 +72,19 @@ public class OperatorSpecs {
public void close() {
mapFn.close();
}
- }, nextStream, OperatorSpec.OpCode.MAP, opId);
+ }, OperatorSpec.OpCode.MAP, opId);
}
/**
* Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
*
* @param filterFn the transformation function
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <M> type of input message
* @return the {@link StreamOperatorSpec}
*/
public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
- FilterFunction<? super M> filterFn, MessageStreamImpl<M> nextStream, int opId) {
+ FilterFunction<? super M> filterFn, int opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
@Override
public Collection<M> apply(M message) {
@@ -111,23 +106,21 @@ public class OperatorSpecs {
public void close() {
filterFn.close();
}
-
- }, nextStream, OperatorSpec.OpCode.FILTER, opId);
+ }, OperatorSpec.OpCode.FILTER, opId);
}
/**
- * Creates a {@link StreamOperatorSpec}.
+ * Creates a {@link StreamOperatorSpec} for {@link FlatMapFunction}.
*
- * @param transformFn the transformation function
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param flatMapFn the transformation function
* @param opId the unique ID of the operator
* @param <M> type of input message
* @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
- public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
- FlatMapFunction<? super M, ? extends OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) {
- return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId);
+ public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
+ FlatMapFunction<? super M, ? extends OM> flatMapFn, int opId) {
+ return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
}
/**
@@ -139,91 +132,89 @@ public class OperatorSpecs {
* @return the {@link SinkOperatorSpec} for the sink operator
*/
public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) {
- return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, OperatorSpec.OpCode.SINK, opId);
+ return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, opId);
}
/**
- * Creates a {@link SinkOperatorSpec} for the sendTo operator.
+ * Creates a {@link OutputOperatorSpec} for the sendTo operator.
*
- * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param outputStream the {@link OutputStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <K> the type of key in the outgoing message
* @param <V> the type of message in the outgoing message
- * @param <M> the type of message in the {@link OutputStreamInternal}
- * @return the {@link SinkOperatorSpec} for the sendTo operator
+ * @param <M> the type of message in the {@link OutputStreamImpl}
+ * @return the {@link OutputOperatorSpec} for the sendTo operator
*/
- public static <K, V, M> SinkOperatorSpec<M> createSendToOperatorSpec(
- OutputStreamInternal<K, V, M> outputStream, int opId) {
- return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
+ public static <K, V, M> OutputOperatorSpec<M> createSendToOperatorSpec(
+ OutputStreamImpl<K, V, M> outputStream, int opId) {
+ return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
}
/**
- * Creates a {@link SinkOperatorSpec} for the partitionBy operator.
+ * Creates a {@link OutputOperatorSpec} for the partitionBy operator.
*
- * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param outputStream the {@link OutputStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <K> the type of key in the outgoing message
* @param <V> the type of message in the outgoing message
- * @param <M> the type of message in the {@link OutputStreamInternal}
- * @return the {@link SinkOperatorSpec} for the partitionBy operator
+ * @param <M> the type of message in the {@link OutputStreamImpl}
+ * @return the {@link OutputOperatorSpec} for the partitionBy operator
*/
- public static <K, V, M> SinkOperatorSpec<M> createPartitionByOperatorSpec(
- OutputStreamInternal<K, V, M> outputStream, int opId) {
- return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
+ public static <K, V, M> OutputOperatorSpec<M> createPartitionByOperatorSpec(
+ OutputStreamImpl<K, V, M> outputStream, int opId) {
+ return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
}
/**
* Creates a {@link WindowOperatorSpec}.
*
* @param window the description of the window.
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <M> the type of input message
- * @param <WK> the type of key in the {@link WindowPane}
+ * @param <WK> the type of key in the window output
* @param <WV> the type of value in the window
* @return the {@link WindowOperatorSpec}
*/
public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
- WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
- return new WindowOperatorSpec<>(window, nextStream, opId);
+ WindowInternal<M, WK, WV> window, int opId) {
+ return new WindowOperatorSpec<>(window, opId);
}
/**
- * Creates a {@link PartialJoinOperatorSpec}.
+ * Creates a {@link JoinOperatorSpec}.
*
- * @param thisPartialJoinFn the partial join function for this message stream
- * @param otherPartialJoinFn the partial join function for the other message stream
+ * @param leftInputOpSpec the operator spec for the stream on the left side of the join
+ * @param rightInputOpSpec the operator spec for the stream on the right side of the join
+ * @param joinFn the user-defined join function to get join keys and results
* @param ttlMs the ttl in ms for retaining messages in each stream
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <K> the type of join key
* @param <M> the type of input message
* @param <JM> the type of message in the other join stream
- * @param <RM> the type of message in the join output
- * @return the {@link PartialJoinOperatorSpec}
+ * @param <RM> the type of join result
+ * @return the {@link JoinOperatorSpec}
*/
- public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec(
- PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
- long ttlMs, MessageStreamImpl<RM> nextStream, int opId) {
- return new PartialJoinOperatorSpec<>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId);
+ public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> createJoinOperatorSpec(
+ OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
+ JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+ return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, ttlMs, opId);
}
/**
* Creates a {@link StreamOperatorSpec} with a merger function.
*
- * @param nextStream the output {@link MessageStreamImpl} to send messages to
* @param opId the unique ID of the operator
* @param <M> the type of input message
* @return the {@link StreamOperatorSpec} for the merge
*/
- public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) {
+ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(int opId) {
return new StreamOperatorSpec<>(message ->
new ArrayList<M>() {
{
this.add(message);
}
},
- nextStream, OperatorSpec.OpCode.MERGE, opId);
+ OperatorSpec.OpCode.MERGE, opId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
new file mode 100644
index 0000000..e6767ec
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+
+/**
+ * The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a
+ * {@link org.apache.samza.system.SystemStream}.
+ * <p>
+ * This is a terminal operator and does not allow further operator chaining.
+ *
+ * @param <M> the type of input message
+ */
+public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
+
+ private OutputStreamImpl<?, ?, M> outputStream;
+
+
+ /**
+ * Constructs an {@link OutputOperatorSpec} to send messages to the provided {@code outStream}
+ *
+ * @param outputStream the {@link OutputStreamImpl} to send messages to
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+ * It could be {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
+ * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
+ */
+ OutputOperatorSpec(OutputStreamImpl<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
+ super(opCode, opId);
+ this.outputStream = outputStream;
+ }
+
+ /**
+ * The {@link OutputStreamImpl} that this operator is sending its output to.
+ * @return the {@link OutputStreamImpl} for this operator if any, else null.
+ */
+ public OutputStreamImpl<?, ?, M> getOutputStream() {
+ return this.outputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
new file mode 100644
index 0000000..5506378
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+public class OutputStreamImpl<K, V, M> implements OutputStream<K, V, M> {
+
+ private final StreamSpec streamSpec;
+ private final Function<M, K> keyExtractor;
+ private final Function<M, V> msgExtractor;
+
+ public OutputStreamImpl(StreamSpec streamSpec,
+ Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+ this.streamSpec = streamSpec;
+ this.keyExtractor = keyExtractor;
+ this.msgExtractor = msgExtractor;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return streamSpec;
+ }
+
+ public Function<M, K> getKeyExtractor() {
+ return keyExtractor;
+ }
+
+ public Function<M, V> getMsgExtractor() {
+ return msgExtractor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
deleted file mode 100644
index 92b4170..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.util.OperatorJsonUtils;
-
-
-/**
- * Spec for the partial join operator that takes messages from one input stream, joins with buffered
- * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
- *
- * @param <K> the type of join key
- * @param <M> the type of input message
- * @param <JM> the type of message in the other join stream
- * @param <RM> the type of message in the join output stream
- */
-public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
-
- private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
- private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
- private final long ttlMs;
- private final MessageStreamImpl<RM> nextStream;
- private final int opId;
- private final String sourceLocation;
-
- /**
- * Default constructor for a {@link PartialJoinOperatorSpec}.
- *
- * @param thisPartialJoinFn partial join function that provides state and the join logic for input messages of
- * type {@code M} in this stream
- * @param otherPartialJoinFn partial join function that provides state for input messages of type {@code JM}
- * in the other stream
- * @param ttlMs the ttl in ms for retaining messages in each stream
- * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
- * @param opId the unique ID for this operator
- */
- PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
- PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs,
- MessageStreamImpl<RM> nextStream, int opId) {
- this.thisPartialJoinFn = thisPartialJoinFn;
- this.otherPartialJoinFn = otherPartialJoinFn;
- this.ttlMs = ttlMs;
- this.nextStream = nextStream;
- this.opId = opId;
- this.sourceLocation = OperatorJsonUtils.getSourceLocation();
- }
-
- @Override
- public MessageStreamImpl<RM> getNextStream() {
- return this.nextStream;
- }
-
- public PartialJoinFunction<K, M, JM, RM> getThisPartialJoinFn() {
- return this.thisPartialJoinFn;
- }
-
- public PartialJoinFunction<K, JM, M, RM> getOtherPartialJoinFn() {
- return this.otherPartialJoinFn;
- }
-
- public long getTtlMs() {
- return ttlMs;
- }
-
- @Override
- public OperatorSpec.OpCode getOpCode() {
- return OpCode.JOIN;
- }
-
- @Override
- public int getOpId() {
- return this.opId;
- }
-
- @Override
- public String getSourceLocation() {
- return sourceLocation;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index afdd6b9..2b55d95 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,29 +18,19 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.util.OperatorJsonUtils;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
/**
- * The spec for an operator that outputs a {@link MessageStreamImpl} to an external system.
+ * The spec for an operator that outputs a stream to an arbitrary external system.
+ * <p>
* This is a terminal operator and does not allow further operator chaining.
*
* @param <M> the type of input message
*/
-public class SinkOperatorSpec<M> implements OperatorSpec {
+public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
private final SinkFunction<M> sinkFn;
- private OutputStreamInternal<?, ?, M> outputStream; // may be null
- private final OperatorSpec.OpCode opCode;
- private final int opId;
- private final String sourceLocation;
/**
* Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
@@ -48,79 +38,14 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
* @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
* the output {@link org.apache.samza.task.MessageCollector} and the
* {@link org.apache.samza.task.TaskCoordinator}.
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
- * It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}.
* @param opId the unique ID of this {@link OperatorSpec} in the graph
*/
- SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
+ SinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+ super(OpCode.SINK, opId);
this.sinkFn = sinkFn;
- this.opCode = opCode;
- this.opId = opId;
- this.sourceLocation = OperatorJsonUtils.getSourceLocation();
- }
-
- /**
- * Constructs a {@link SinkOperatorSpec} to send messages to the provided {@code outStream}
- * @param outputStream the {@link OutputStreamInternal} to send messages to
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
- * It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
- * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
- */
- SinkOperatorSpec(OutputStreamInternal<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
- this(createSinkFn(outputStream), opCode, opId);
- this.outputStream = outputStream;
- }
-
- /**
- * This is a terminal operator and doesn't allow further operator chaining.
- * @return null
- */
- @Override
- public MessageStreamImpl<M> getNextStream() {
- return null;
- }
-
- /**
- * The {@link OutputStreamInternal} that this operator is sending its output to.
- * @return the {@link OutputStreamInternal} for this operator if any, else null.
- */
- public OutputStreamInternal<?, ?, M> getOutputStream() {
- return this.outputStream;
}
public SinkFunction<M> getSinkFn() {
return this.sinkFn;
}
-
- @Override
- public OperatorSpec.OpCode getOpCode() {
- return this.opCode;
- }
-
- @Override
- public int getOpId() {
- return this.opId;
- }
-
- @Override
- public String getSourceLocation() {
- return sourceLocation;
- }
-
- /**
- * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
- * @param outputStream the {@link OutputStreamInternal} to send messages to
- * @param <M> the type of input message
- * @return a {@link SinkFunction} that sends messages to the provided {@code output}
- */
- private static <M> SinkFunction<M> createSinkFn(OutputStreamInternal<?, ?, M> outputStream) {
- return (M message, MessageCollector mc, TaskCoordinator tc) -> {
- // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
- SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
- outputStream.getStreamSpec().getPhysicalName());
- Object key = outputStream.getKeyExtractor().apply(message);
- Object msg = outputStream.getMsgExtractor().apply(message);
- mc.send(new OutgoingMessageEnvelope(systemStream, key, msg));
- };
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index c53efae..1f2f683 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,63 +18,32 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.util.OperatorJsonUtils;
/**
- * The spec for a linear stream operator that outputs 0 or more messages for each input message.
+ * The spec for a simple stream operator that outputs 0 or more messages for each input message.
*
* @param <M> the type of input message
* @param <OM> the type of output message
*/
-public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
+public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
private final FlatMapFunction<M, OM> transformFn;
- private final MessageStreamImpl<OM> nextStream;
- private final OperatorSpec.OpCode opCode;
- private final int opId;
- private final String sourceLocation;
/**
- * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
+ * Constructor for a {@link StreamOperatorSpec}.
*
* @param transformFn the transformation function
- * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
* @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
- * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
+ * @param opId the unique ID for this {@link StreamOperatorSpec}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream,
- OperatorSpec.OpCode opCode, int opId) {
+ StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, int opId) {
+ super(opCode, opId);
this.transformFn = transformFn;
- this.nextStream = nextStream;
- this.opCode = opCode;
- this.opId = opId;
- this.sourceLocation = OperatorJsonUtils.getSourceLocation();
- }
-
- @Override
- public MessageStreamImpl<OM> getNextStream() {
- return this.nextStream;
}
public FlatMapFunction<M, OM> getTransformFn() {
return this.transformFn;
}
-
- @Override
- public OperatorSpec.OpCode getOpCode() {
- return this.opCode;
- }
-
- @Override
- public int getOpId() {
- return this.opId;
- }
-
- @Override
- public String getSourceLocation() {
- return sourceLocation;
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 3c2be0a..0937499 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,13 +19,11 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.triggers.AnyTrigger;
import org.apache.samza.operators.triggers.RepeatingTrigger;
import org.apache.samza.operators.triggers.TimeBasedTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.util.MathUtils;
-import org.apache.samza.operators.util.OperatorJsonUtils;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.slf4j.Logger;
@@ -37,58 +35,32 @@ import java.util.stream.Collectors;
/**
- * Default window operator spec object
+ * The spec for an operator that groups messages into finite windows for processing
*
* @param <M> the type of input message to the window
* @param <WK> the type of key of the window
* @param <WV> the type of aggregated value in the window output {@link WindowPane}
*/
-public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
+public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK, WV>> {
private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorSpec.class);
private final WindowInternal<M, WK, WV> window;
- private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
- private final int opId;
- private final String sourceLocation;
/**
* Constructor for {@link WindowOperatorSpec}.
*
* @param window the window function
- * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
* @param opId auto-generated unique ID of this operator
*/
- WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
- this.nextStream = nextStream;
+ WindowOperatorSpec(WindowInternal<M, WK, WV> window, int opId) {
+ super(OpCode.WINDOW, opId);
this.window = window;
- this.opId = opId;
- this.sourceLocation = OperatorJsonUtils.getSourceLocation();
- }
-
- @Override
- public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
- return this.nextStream;
}
public WindowInternal<M, WK, WV> getWindow() {
return window;
}
- @Override
- public OpCode getOpCode() {
- return OpCode.WINDOW;
- }
-
- @Override
- public int getOpId() {
- return this.opId;
- }
-
- @Override
- public String getSourceLocation() {
- return sourceLocation;
- }
-
/**
* Get the default triggering interval for this {@link WindowOperatorSpec}
*
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
deleted file mode 100644
index e67b326..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-
-/**
- * Internal representation of an input stream.
- *
- * @param <M> the type of messages in the input stream
- */
-@InterfaceStability.Unstable
-public interface InputStreamInternal<K, V, M> extends MessageStream<M> {
-
- StreamSpec getStreamSpec();
-
- BiFunction<K, V, M> getMsgBuilder();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
deleted file mode 100644
index c4337d0..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-
-public class InputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements InputStreamInternal<K, V, M> {
-
- private final StreamSpec streamSpec;
- private final BiFunction<K, V, M> msgBuilder;
-
- public InputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder) {
- super(graph);
- this.streamSpec = streamSpec;
- this.msgBuilder = msgBuilder;
- }
-
- public StreamSpec getStreamSpec() {
- return this.streamSpec;
- }
-
- public BiFunction<K, V, M> getMsgBuilder() {
- return this.msgBuilder;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
new file mode 100644
index 0000000..f0bb1dc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.system.StreamSpec;
+
+/**
+ * An intermediate stream is both an input and an output stream (e.g. a repartitioned stream).
+ * <p>
+ * This implementation accepts a pair of {@link InputOperatorSpec} and {@link OutputStreamImpl} associated
+ * with the same logical {@code streamId}. It provides access to its {@link OutputStreamImpl} for
+ * {@link MessageStreamImpl#partitionBy} to send messages out to. It's also a {@link MessageStreamImpl} with
+ * {@link InputOperatorSpec} as its operator spec, so that further operations can be chained on the
+ * {@link InputOperatorSpec}.
+ *
+ * @param <K> the type of key in the outgoing/incoming message
+ * @param <V> the type of message in the outgoing/incoming message
+ * @param <M> the type of message in the output {@link MessageStreamImpl}
+ */
+public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStream<K, V, M> {
+
+ private final OutputStreamImpl<K, V, M> outputStream;
+
+ public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<K, V, M> inputOperatorSpec,
+ OutputStreamImpl<K, V, M> outputStream) {
+ super(graph, inputOperatorSpec);
+ this.outputStream = outputStream;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return this.outputStream.getStreamSpec();
+ }
+
+ public OutputStreamImpl<K, V, M> getOutputStream() {
+ return this.outputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
deleted file mode 100644
index 8f45f7a..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M>
- implements InputStreamInternal<K, V, M>, OutputStreamInternal<K, V, M> {
-
- private final StreamSpec streamSpec;
- private final Function<M, K> keyExtractor;
- private final Function<M, V> msgExtractor;
- private final BiFunction<K, V, M> msgBuilder;
-
- public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, Function<M, K> keyExtractor,
- Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
- super(graph);
- this.streamSpec = streamSpec;
- this.keyExtractor = keyExtractor;
- this.msgExtractor = msgExtractor;
- this.msgBuilder = msgBuilder;
- }
-
- public StreamSpec getStreamSpec() {
- return this.streamSpec;
- }
-
- public Function<M, K> getKeyExtractor() {
- return this.keyExtractor;
- }
-
- public Function<M, V> getMsgExtractor() {
- return this.msgExtractor;
- }
-
- @Override
- public BiFunction<K, V, M> getMsgBuilder() {
- return this.msgBuilder;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
deleted file mode 100644
index 48ce641..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.Function;
-
-
-/**
- * Internal representation of an output stream.
- *
- * @param <M> the type of messages in the output stream
- */
-@InterfaceStability.Unstable
-public interface OutputStreamInternal<K, V, M> extends OutputStream<K, V, M> {
-
- StreamSpec getStreamSpec();
-
- Function<M, K> getKeyExtractor();
-
- Function<M, V> getMsgExtractor();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
deleted file mode 100644
index a2d0cca..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.Function;
-
-public class OutputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStreamInternal<K, V, M> {
-
- private final StreamSpec streamSpec;
- private final Function<M, K> keyExtractor;
- private final Function<M, V> msgExtractor;
-
- public OutputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
- Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
- super(graph);
- this.streamSpec = streamSpec;
- this.keyExtractor = keyExtractor;
- this.msgExtractor = msgExtractor;
- }
-
- public StreamSpec getStreamSpec() {
- return this.streamSpec;
- }
-
- public Function<M, K> getKeyExtractor() {
- return this.keyExtractor;
- }
-
- public Function<M, V> getMsgExtractor() {
- return this.msgExtractor;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
deleted file mode 100644
index b971607..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.util;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-
-public class OperatorJsonUtils {
- private static final String OP_CODE = "opCode";
- private static final String OP_ID = "opId";
- private static final String SOURCE_LOCATION = "sourceLocation";
- private static final String NEXT_OPERATOR_IDS = "nextOperatorIds";
- private static final String OUTPUT_STREAM_ID = "outputStreamId";
- private static final String TTL_MS = "ttlMs";
-
- /**
- * Format the operator properties into a map
- * @param spec a {@link OperatorSpec} instance
- * @return map of the operator properties
- */
- public static Map<String, Object> operatorToMap(OperatorSpec spec) {
- Map<String, Object> map = new HashMap<>();
- map.put(OP_CODE, spec.getOpCode().name());
- map.put(OP_ID, spec.getOpId());
- map.put(SOURCE_LOCATION, spec.getSourceLocation());
-
- if (spec.getNextStream() != null) {
- Collection<OperatorSpec> nextOperators = spec.getNextStream().getRegisteredOperatorSpecs();
- map.put(NEXT_OPERATOR_IDS, nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
- } else {
- map.put(NEXT_OPERATOR_IDS, Collections.emptySet());
- }
-
- if (spec instanceof SinkOperatorSpec) {
- OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream();
- if (outputStream != null) {
- map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId());
- }
- }
-
- if (spec instanceof PartialJoinOperatorSpec) {
- map.put(TTL_MS, ((PartialJoinOperatorSpec) spec).getTtlMs());
- }
-
- return map;
- }
-
- /**
- * Return the location of source code that creates the operator.
- * This function is invoked in the constructor of each operator.
- * @return formatted source location including file and line number
- */
- public static String getSourceLocation() {
- // The stack trace looks like:
- // [0] Thread.getStackTrace()
- // [1] OperatorJsonUtils.getSourceLocation()
- // [2] SomeOperator.<init>()
- // [3] OperatorSpecs.createSomeOperator()
- // [4] MessageStreamImpl.someOperator()
- // [5] User code that calls [2]
- // we are only interested in [5] here
- StackTraceElement location = Thread.currentThread().getStackTrace()[5];
- return String.format("%s:%s", location.getFileName(), location.getLineNumber());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 50ae775..a77ef3b 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,22 +18,19 @@
*/
package org.apache.samza.task;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.operators.impl.RootOperatorImpl;
-import org.apache.samza.operators.stream.InputStreamInternal;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* A {@link StreamTask} implementation that brings all the operator API implementation components together and
@@ -47,7 +44,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
private OperatorImplGraph operatorImplGraph;
private ContextManager contextManager;
- private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
/**
* Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
@@ -70,11 +66,8 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
* <p>
* Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
* the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
- * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
- *<p>
- * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
- * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
- * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
+ * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. It then uses
+ * the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical DAG.
*
* @param config allows accessing of fields in the configuration files that this StreamTask is specified in
* @param context allows initializing and accessing contextual data of this StreamTask
@@ -93,20 +86,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
}
// create the operator impl DAG corresponding to the logical operator spec DAG
- OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
- operatorImplGraph.init(streamGraph, config, context);
- this.operatorImplGraph = operatorImplGraph;
-
- // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
- inputSystemStreamToInputStream = new HashMap<>();
- streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
- SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
- inputSystemStreamToInputStream.put(systemStream, inputStream);
- });
+ this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock);
}
/**
- * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
+ * Passes the incoming message envelopes along to the {@link InputOperatorImpl} node
* for the input {@link SystemStream}.
* <p>
* From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
@@ -119,20 +103,16 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
@Override
public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
- InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
- RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
- if (rootOperatorImpl != null) {
- // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
- // before applying the msgBuilder.
- Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
- rootOperatorImpl.onMessage(message, collector, coordinator);
+ InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
+ if (inputOpImpl != null) {
+ inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
}
}
@Override
public final void window(MessageCollector collector, TaskCoordinator coordinator) {
- operatorImplGraph.getAllRootOperators()
- .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
+ operatorImplGraph.getAllInputOperators()
+ .forEach(inputOperator -> inputOperator.onTimer(collector, coordinator));
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index daa223a..2c8f682 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -224,12 +224,12 @@ public class TestExecutionPlanner {
when(runner.getStreamSpec("output2")).thenReturn(output2);
// intermediate streams used in tests
- when(runner.getStreamSpec("test-app-1-partition_by-0"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
when(runner.getStreamSpec("test-app-1-partition_by-1"))
.thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-4"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-3"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-8"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
}
@Test
@@ -272,7 +272,7 @@ public class TestExecutionPlanner {
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
- assertTrue(edge.getPartitionCount() == 64);
+ assertEquals(64, edge.getPartitionCount());
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index e53cd42..4bda86b 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -39,7 +39,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -50,13 +50,16 @@ public class TestJobGraphJsonGenerator {
public void test() throws Exception {
/**
- * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
+ * the graph looks like the following.
+ * number in parentheses () indicates number of stream partitions.
+ * number in parentheses in quotes ("") indicates expected partition count.
+ * number in square brackets [] indicates operator ID.
*
- * input1 (64) -> map -> join -> output1 (8)
- * |
- * input2 (16) -> partitionBy ("64") -> filter -|
- * |
- * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
+ * input3 (32) -> filter [7] -> partitionBy [8] ("64") -> map [10] -> join [14] -> sendTo(output2) [15] (16)
+ * |
+ * input2 (16) -> partitionBy [3] ("64") -> filter [5] -| -> sink [13]
+ * |
+ * input1 (64) -> map [1] -> join [11] -> sendTo(output1) [12] (8)
*
*/
@@ -80,12 +83,10 @@ public class TestJobGraphJsonGenerator {
when(runner.getStreamSpec("output2")).thenReturn(output2);
// intermediate streams used in tests
- when(runner.getStreamSpec("test-app-1-partition_by-0"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-1"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
- when(runner.getStreamSpec("test-app-1-partition_by-4"))
- .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-3"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-8"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
// set up external partition count
Map<String, Integer> system1Map = new HashMap<>();
@@ -124,10 +125,10 @@ public class TestJobGraphJsonGenerator {
// deserialize
ObjectMapper mapper = new ObjectMapper();
JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
- assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
- assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13);
- assertTrue(nodes.sourceStreams.size() == 3);
- assertTrue(nodes.sinkStreams.size() == 2);
- assertTrue(nodes.intermediateStreams.size() == 2);
+ assertEquals(5, nodes.jobs.get(0).operatorGraph.inputStreams.size());
+ assertEquals(11, nodes.jobs.get(0).operatorGraph.operators.size());
+ assertEquals(3, nodes.sourceStreams.size());
+ assertEquals(2, nodes.sinkStreams.size());
+ assertEquals(2, nodes.intermediateStreams.size());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 39745bf..0c41fb8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -74,18 +74,18 @@ public class TestJoinOperator {
public void testJoinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(joinFn));
- assertEquals(joinFn.getNumInitCalls(), 1);
+ assertEquals(1, joinFn.getNumInitCalls());
MessageCollector messageCollector = mock(MessageCollector.class);
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// close should not be called till now
- assertEquals(joinFn.getNumCloseCalls(), 0);
+ assertEquals(0, joinFn.getNumCloseCalls());
sot.close();
// close should be called from sot.close()
- assertEquals(joinFn.getNumCloseCalls(), 1);
+ assertEquals(1, joinFn.getNumCloseCalls());
}
@Test