You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:40:52 UTC
[09/14] samza git commit: SAMZA-1073: top-level fluent API
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
new file mode 100644
index 0000000..fe6e7e7
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class TestJoinExample extends TestExampleBase {
+
+ TestJoinExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ for (SystemStream input : inputs.keySet()) {
+ MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return input;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).map(this::getInputMessage);
+ if (joinOutput == null) {
+ joinOutput = newSource;
+ } else {
+ joinOutput = joinOutput.join(newSource, new MyJoinFunction());
+ }
+ }
+
+ joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return null;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
+ JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+
+ @Override
+ public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
+ return this.myJoinResult(message, otherMessage);
+ }
+
+ @Override
+ public String getFirstKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
new file mode 100644
index 0000000..e08ca20
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class TestWindowExample extends TestExampleBase {
+ class MessageType {
+ String field1;
+ String field2;
+ }
+
+ TestWindowExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+ inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return source;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).
+ map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
+ m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+
+ }
+
+ String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+ return m.getKey().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
deleted file mode 100644
index 663d98c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastTask implements StreamOperatorTask {
- class MessageType {
- String field1;
- String field2;
- String field3;
- String field4;
- String parKey;
- private long timestamp;
-
- public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
- messageStreams.values().forEach(entry -> {
- MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
- });
- }
-
- JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) {
- return (JsonMessageEnvelope) m1.getMessage();
- }
-
- boolean myFilter1(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key1");
- }
-
- boolean myFilter2(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key2");
- }
-
- boolean myFilter3(JsonMessageEnvelope m1) {
- return m1.getMessage().parKey.equals("key3");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
deleted file mode 100644
index 1b10609..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class JoinTask implements StreamOperatorTask {
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- MessageStream<JsonMessageEnvelope> joinOutput = null;
-
- @Override
- public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- messageStreams.values().forEach(messageStream -> {
- MessageStream<JsonMessageEnvelope> newSource = messageStream.map(this::getInputMessage);
- if (joinOutput == null) {
- joinOutput = newSource;
- } else {
- joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2));
- }
- });
- }
-
- private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
deleted file mode 100644
index 61bb32a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestFluentStreamAdaptorTask {
- Field userTaskField = null;
- Field operatorChainsField = null;
-
- @Before
- public void prep() throws NoSuchFieldException {
- userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
- operatorChainsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- userTaskField.setAccessible(true);
- operatorChainsField.setAccessible(true);
- }
-
- @Test
- public void testConstructor() throws IllegalAccessException {
- StreamOperatorTask userTask = mock(StreamOperatorTask.class);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
- StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask);
- Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
- assertEquals(taskMemberVar, userTask);
- assertTrue(chainsMap.isEmpty());
- }
-
- @Test
- public void testInit() throws Exception {
- StreamOperatorTask userTask = mock(StreamOperatorTask.class);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- Set<SystemStreamPartition> testInputs = new HashSet() { {
- this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0)));
- this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1)));
- } };
- when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
- adaptorTask.init(mockConfig, mockContext);
- verify(userTask, times(1)).transform(Mockito.anyMap());
- Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
- assertTrue(chainsMap.size() == 2);
- assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
- assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
- }
-
- // TODO: window and process methods to be added after implementation of ChainedOperators.create()
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
deleted file mode 100644
index d804bf8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestFluentStreamTasks {
-
- private final WindowTask userTask = new WindowTask();
-
- private final BroadcastTask splitTask = new BroadcastTask();
-
- private final JoinTask joinTask = new JoinTask();
-
- private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
- for (int i = 0; i < 4; i++) {
- this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i)));
- }
- } };
-
- @Test
- public void testUserTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
- @Test
- public void testSplitTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
- @Test
- public void testJoinTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 5991e2f..160a47a 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -29,6 +29,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -46,17 +47,19 @@ import static org.mockito.Mockito.when;
public class TestMessageStreamImpl {
+ private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+
@Test
public void testMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap =
- m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
+ new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
assertTrue(mapOp instanceof StreamOperatorSpec);
- assertEquals(mapOp.getOutputStream(), outputStream);
+ assertEquals(mapOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
@@ -73,33 +76,33 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
this.add(mock(TestOutputMessageEnvelope.class));
this.add(mock(TestOutputMessageEnvelope.class));
this.add(mock(TestOutputMessageEnvelope.class));
} };
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts;
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getOutputStream(), outputStream);
+ assertEquals(flatMapOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
}
@Test
public void testFilter() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L;
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
assertTrue(filterOp instanceof StreamOperatorSpec);
- assertEquals(filterOp.getOutputStream(), outputStream);
+ assertEquals(filterOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
@@ -117,8 +120,8 @@ public class TestMessageStreamImpl {
@Test
public void testSink() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
};
@@ -128,26 +131,42 @@ public class TestMessageStreamImpl {
OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
assertTrue(sinkOp instanceof SinkOperatorSpec);
assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
- assertNull(((SinkOperatorSpec) sinkOp).getOutputStream());
+ assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
}
@Test
public void testJoin() {
- MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>();
- MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>();
- JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
- (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
+ JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+ new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
+ return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ }
+
+ @Override
+ public String getFirstKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+ };
+
MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput);
+ assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
subs = source2.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput);
+ assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
@@ -160,10 +179,10 @@ public class TestMessageStreamImpl {
@Test
public void testMerge() {
- MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>();
+ MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
- this.add(new MessageStreamImpl<>());
- this.add(new MessageStreamImpl<>());
+ this.add(new MessageStreamImpl<>(mockGraph));
+ this.add(new MessageStreamImpl<>(mockGraph));
} };
MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
validateMergeOperator(merge1, mergeOutput);
@@ -176,7 +195,7 @@ public class TestMessageStreamImpl {
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
assertTrue(mergeOp instanceof StreamOperatorSpec);
- assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput);
+ assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
assertEquals(outputs.size(), 1);
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
new file mode 100644
index 0000000..c4e9f51
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+public class TestMessageStreamImplUtil {
+ public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
+ return new MessageStreamImpl<M>(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
deleted file mode 100644
index e176063..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class WindowTask implements StreamOperatorTask {
- class MessageType {
- String field1;
- String field2;
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
- messageStreams.values().forEach(source ->
- source.map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), m1.getSystemStreamPartition()))
- .window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))
- );
- }
-
- String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
- return m.getKey().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index bae98e3..02637a3 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,17 +18,26 @@
*/
package org.apache.samza.operators.impl;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.task.TaskContext;
import org.junit.Before;
import org.junit.Test;
@@ -38,7 +47,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -49,25 +57,46 @@ import static org.mockito.Mockito.when;
public class TestOperatorImpls {
Field nextOperatorsField = null;
+ Method createOpMethod = null;
+ Method createOpsMethod = null;
@Before
- public void prep() throws NoSuchFieldException {
+ public void prep() throws NoSuchFieldException, NoSuchMethodException {
nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
nextOperatorsField.setAccessible(true);
+
+ createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+ OperatorSpec.class, Config.class, TaskContext.class);
+ createOpMethod.setAccessible(true);
+
+ createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+ createOpsMethod.setAccessible(true);
}
-
+
@Test
- public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException {
+ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
// get window operator
WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd);
+ WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+ when(mockWnd.getWindow()).thenReturn(windowInternal);
+ MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+
+ OperatorGraph opGraph = new OperatorGraph();
+ OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+ createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
assertTrue(opImpl instanceof WindowOperatorImpl);
+ Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
+ wndInternalField.setAccessible(true);
+ WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
+ assertEquals(wndInternal, windowInternal);
// get simple operator
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
assertTrue(opImpl instanceof StreamOperatorImpl);
Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
txfmFnField.setAccessible(true);
@@ -77,7 +106,7 @@ public class TestOperatorImpls {
SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = OperatorImpls.createOperatorImpl(sinkOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
assertTrue(opImpl instanceof SinkOperatorImpl);
Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
sinkFnField.setAccessible(true);
@@ -86,28 +115,33 @@ public class TestOperatorImpls {
// get join operator
PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
- BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput;
+ PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
when(joinOp.getTransformFn()).thenReturn(joinFn);
- opImpl = OperatorImpls.createOperatorImpl(joinOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
assertTrue(opImpl instanceof PartialJoinOperatorImpl);
}
@Test
- public void testEmptyChain() {
+ public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
// test creation of empty chain
- MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
TaskContext mockContext = mock(TaskContext.class);
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext);
+ Config mockConfig = mock(Config.class);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
assertTrue(operatorChain != null);
}
@Test
- public void testLinearChain() throws IllegalAccessException {
+ public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
// test creation of linear chain
- MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
- testInput.map(m -> m).window(Windows.tumblingWindow(Duration.ofMillis(1000)));
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ Config mockConfig = mock(Config.class);
+ testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 1);
OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
@@ -119,13 +153,16 @@ public class TestOperatorImpls {
}
@Test
- public void testBroadcastChain() throws IllegalAccessException {
+ public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
// test creation of broadcast chain
- MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 2);
Iterator<OperatorImpl> iter = subsSet.iterator();
@@ -146,18 +183,36 @@ public class TestOperatorImpls {
}
@Test
- public void testJoinChain() throws IllegalAccessException {
+ public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
// test creation of join chain
- MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>();
- MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
input1
- .join(input2, (m1, m2) ->
- new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()))
+ .join(input2,
+ new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
+ return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ }
+
+ @Override
+ public String getFirstKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+ })
.map(m -> m);
+ OperatorGraph opGraph = new OperatorGraph();
// now, we create chained operators from each input sources
- RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext);
- RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext);
+ RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
+ RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
// check that those two chains will merge at map operator
// first branch of the join
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ba5b6f8..ce9fdd2 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,17 +18,16 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestSinkOperatorImpl {
@@ -38,7 +37,9 @@ public class TestSinkOperatorImpl {
SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 5a3840c..010a210 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,22 +18,20 @@
*/
package org.apache.samza.operators.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.TestMessageEnvelope;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestStreamOperatorImpl {
@@ -43,8 +41,10 @@ public class TestStreamOperatorImpl {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
-
- StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp));
+ MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ffe9df1..31257a4 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -18,14 +18,18 @@
*/
package org.apache.samza.operators.spec;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.WindowKey;
import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
import java.util.ArrayList;
@@ -42,19 +46,23 @@ public class TestOperatorSpecs {
@Test
public void testGetStreamOperator() {
FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
- } };
- StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn);
+ this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+ } };
+ MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
assertEquals(strmOp.getTransformFn(), transformFn);
- assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl);
+ assertEquals(strmOp.getNextStream(), mockOutput);
}
@Test
public void testGetSinkOperator() {
- SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn);
+ SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
+ TaskCoordinator taskCoordinator) -> { };
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
assertEquals(sinkOp.getSinkFn(), sinkFn);
- assertTrue(sinkOp.getOutputStream() == null);
+ assertTrue(sinkOp.getNextStream() == null);
}
@Test
@@ -65,8 +73,9 @@ public class TestOperatorSpecs {
//instantiate a window using reflection
WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
- WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, WindowKey<String>, Integer,
- WindowPane<WindowKey<String>, Integer>>createWindowOperatorSpec(window);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
+ WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
assertEquals(spec.getWindow(), window);
assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
assertEquals(spec.getWindow().getFoldFunction(), aggregator);
@@ -74,13 +83,30 @@ public class TestOperatorSpecs {
@Test
public void testGetPartialJoinOperator() {
- BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
- (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
- MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>();
+ PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+ new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
+ @Override
+ public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
+ return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+ }
+
+ @Override
+ public Object getKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+
+ @Override
+ public Object getOtherKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+ };
+
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
- OperatorSpecs.createPartialJoinOperatorSpec(merger, joinOutput);
+ OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
- assertEquals(partialJoin.getOutputStream(), joinOutput);
+ assertEquals(partialJoin.getNextStream(), joinOutput);
MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
assertEquals(partialJoin.getTransformFn(), merger);
@@ -88,13 +114,14 @@ public class TestOperatorSpecs {
@Test
public void testGetMergeOperator() {
- MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>();
- StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(output);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
this.add(t);
} };
TestMessageEnvelope t = mock(TestMessageEnvelope.class);
assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
- assertEquals(mergeOp.getOutputStream(), output);
+ assertEquals(mergeOp.getNextStream(), output);
}
}