You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:30 UTC
[2/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159;
Remove MessageEnvelope from public operator APIs. : Delay the
creation of SinkFunction for output streams. : Move StreamSpec from a public
API to an internal class.
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
deleted file mode 100644
index 1c30a21..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class TestBroadcastExample extends TestExampleBase {
-
- TestBroadcastExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class MessageType {
- String field1;
- String field2;
- String field3;
- String field4;
- String parKey;
- private long timestamp;
-
- public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1;
- Supplier<Integer> initialValue = () -> 0;
-
- inputs.keySet().forEach(entry -> {
- MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
- new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- });
- }
-
- JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
- return (JsonMessageEnvelope) m1.getMessage();
- }
-
- boolean myFilter1(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key1");
- }
-
- boolean myFilter2(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key2");
- }
-
- boolean myFilter3(JsonMessageEnvelope m1) {
- return m1.getMessage().parKey.equals("key3");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
deleted file mode 100644
index dd661a0..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for test examples
- *
- */
-public abstract class TestExampleBase implements StreamApplication {
-
- protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
-
- TestExampleBase(Set<SystemStreamPartition> inputs) {
- this.inputs = new HashMap<>();
- for (SystemStreamPartition input : inputs) {
- this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
- this.inputs.get(input.getSystemStream()).add(input);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
deleted file mode 100644
index 6c9f8c2..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class TestJoinExample extends TestExampleBase {
-
- TestJoinExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- MessageStream<JsonMessageEnvelope> joinOutput = null;
-
- @Override
- public void init(StreamGraph graph, Config config) {
-
- for (SystemStream input : inputs.keySet()) {
- StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem());
- MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
- inputStreamSpec, null, null).map(this::getInputMessage);
- if (joinOutput == null) {
- joinOutput = newSource;
- } else {
- joinOutput = joinOutput.join(newSource, new MyJoinFunction(), Duration.ofMinutes(1));
- }
- }
-
- joinOutput.sendTo(graph.createOutStream(
- new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
- new StringSerde("UTF-8"), new JsonSerde<>()));
-
- }
-
- private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
- JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-
- @Override
- public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
- return this.myJoinResult(message, otherMessage);
- }
-
- @Override
- public String getFirstKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
deleted file mode 100644
index c88df7c..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class TestWindowExample extends TestExampleBase {
- class MessageType {
- String field1;
- String field2;
- }
-
- TestWindowExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1;
- Supplier<Integer> initialValue = () -> 0;
- inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
- new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
- map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
- m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator)));
-
- }
-
- String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
- return m.getKey().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
new file mode 100644
index 0000000..159dba2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.function.Supplier;
+
+
+/**
+ * Example implementation of a simple user-defined task w/ a window operator.
+ *
+ */
+public class WindowExample implements StreamApplication {
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ Supplier<Integer> initialValue = () -> 0;
+ FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+ MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+ OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph
+ .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage());
+
+ // create a tumbling window that outputs the number of message collected every 10 minutes.
+ // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
+ // for 1 minute.
+ inputStream
+ .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+ .sendTo(outputStream);
+ }
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new WindowExample());
+ }
+
+ class PageViewEvent {
+ String key;
+ long timestamp;
+
+ public PageViewEvent(String key, long timestamp) {
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index e524ba1..e661798 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -19,56 +19,52 @@
package org.apache.samza.execution;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.runtime.AbstractApplicationRunner;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestExecutionPlanner {
- private Config config;
-
private static final String DEFAULT_SYSTEM = "test-system";
private static final int DEFAULT_PARTITIONS = 10;
+ private Map<String, SystemAdmin> systemAdmins;
+ private StreamManager streamManager;
+ private ApplicationRunner runner;
+ private Config config;
+
private StreamSpec input1;
private StreamSpec input2;
private StreamSpec input3;
private StreamSpec output1;
private StreamSpec output2;
- private Map<String, SystemAdmin> systemAdmins;
- private StreamManager streamManager;
-
- private ApplicationRunner runner;
-
private JoinFunction createJoin() {
return new JoinFunction() {
@Override
@@ -88,14 +84,6 @@ public class TestExecutionPlanner {
};
}
- private SinkFunction createSink() {
- return new SinkFunction() {
- @Override
- public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
- }
- };
- }
-
private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
return new SystemAdmin() {
@@ -139,37 +127,43 @@ public class TestExecutionPlanner {
};
}
- private StreamGraph createSimpleGraph() {
+ private StreamGraphImpl createSimpleGraph() {
/**
* a simple graph of partitionBy and map
*
* input1 -> partitionBy -> map -> output1
*
*/
- StreamGraph streamGraph = new StreamGraphImpl(runner, config);
- streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+ OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+ streamGraph.getInputStream("input1", null)
+ .partitionBy(m -> "yes!!!").map(m -> m)
+ .sendTo(output1);
return streamGraph;
}
- private StreamGraph createStreamGraphWithJoin() {
+ private StreamGraphImpl createStreamGraphWithJoin() {
- /** the graph looks like the following
+ /**
+ * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
*
- * input1 -> map -> join -> output1
- * |
- * input2 -> partitionBy -> filter -|
- * |
- * input3 -> filter -> partitionBy -> map -> join -> output2
+ * input1 (64) -> map -> join -> output1 (8)
+ * |
+ * input2 (16) -> partitionBy ("64") -> filter -|
+ * |
+ * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
*
*/
- StreamGraph streamGraph = new StreamGraphImpl(runner, config);
- MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
- MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+ MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m);
+ MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true);
+ MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+ OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+ OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null);
- m1.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output1, null, null));
- m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output2, null, null));
+ m1.join(m2, createJoin(), Duration.ofHours(2)).sendTo(output1);
+ m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(output2);
return streamGraph;
}
@@ -205,27 +199,26 @@ public class TestExecutionPlanner {
systemAdmins.put("system2", systemAdmin2);
streamManager = new StreamManager(systemAdmins);
- runner = new AbstractApplicationRunner(config) {
- @Override
- public void run(StreamApplication streamApp) {
- }
-
- @Override
- public void kill(StreamApplication streamApp) {
-
- }
-
- @Override
- public ApplicationStatus status(StreamApplication streamApp) {
- return null;
- }
- };
+ runner = mock(ApplicationRunner.class);
+ when(runner.getStreamSpec("input1")).thenReturn(input1);
+ when(runner.getStreamSpec("input2")).thenReturn(input2);
+ when(runner.getStreamSpec("input3")).thenReturn(input3);
+ when(runner.getStreamSpec("output1")).thenReturn(output1);
+ when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+ // intermediate streams used in tests
+ when(runner.getStreamSpec("test-app-1-partition_by-0"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-1"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
+ when(runner.getStreamSpec("test-app-1-partition_by-4"))
+ .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
}
@Test
public void testCreateProcessorGraph() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraph streamGraph = createStreamGraphWithJoin();
+ StreamGraphImpl streamGraph = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
assertTrue(jobGraph.getSources().size() == 3);
@@ -236,7 +229,7 @@ public class TestExecutionPlanner {
@Test
public void testFetchExistingStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraph streamGraph = createStreamGraphWithJoin();
+ StreamGraphImpl streamGraph = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -254,7 +247,7 @@ public class TestExecutionPlanner {
@Test
public void testCalculateJoinInputPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraph streamGraph = createStreamGraphWithJoin();
+ StreamGraphImpl streamGraph = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -273,7 +266,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraph streamGraph = createSimpleGraph();
+ StreamGraphImpl streamGraph = createSimpleGraph();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
planner.calculatePartitions(streamGraph, jobGraph);
@@ -286,7 +279,7 @@ public class TestExecutionPlanner {
@Test
public void testCalculateIntStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraph streamGraph = createSimpleGraph();
+ StreamGraphImpl streamGraph = createSimpleGraph();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
planner.calculatePartitions(streamGraph, jobGraph);
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 4e6c750..acc8588 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -48,7 +47,6 @@ import static org.mockito.Mockito.when;
public class TestJoinOperator {
private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
- private final ApplicationRunner runner = mock(ApplicationRunner.class);
private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
@Test
@@ -226,6 +224,10 @@ public class TestJoinOperator {
}
private StreamOperatorTask createStreamOperatorTask() throws Exception {
+ ApplicationRunner runner = mock(ApplicationRunner.class);
+ when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
+ when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
+
TaskContext taskContext = mock(TaskContext.class);
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
@@ -239,13 +241,12 @@ public class TestJoinOperator {
}
private class TestStreamApplication implements StreamApplication {
- StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem");
- StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2");
-
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null);
- MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null);
+ MessageStream<FirstStreamIME> inStream =
+ graph.getInputStream("instream", FirstStreamIME::new);
+ MessageStream<SecondStreamIME> inStream2 =
+ graph.getInputStream("instream2", SecondStreamIME::new);
SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
@@ -256,22 +257,20 @@ public class TestJoinOperator {
}
}
- private class TestJoinFunction
- implements JoinFunction<Integer, MessageEnvelope<Integer, Integer>, MessageEnvelope<Integer, Integer>, Integer> {
+ private class TestJoinFunction implements JoinFunction<Integer, FirstStreamIME, SecondStreamIME, Integer> {
@Override
- public Integer apply(MessageEnvelope<Integer, Integer> message,
- MessageEnvelope<Integer, Integer> otherMessage) {
- return message.getMessage() + otherMessage.getMessage();
+ public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) {
+ return (Integer) message.getMessage() + (Integer) otherMessage.getMessage();
}
@Override
- public Integer getFirstKey(MessageEnvelope<Integer, Integer> message) {
- return message.getKey();
+ public Integer getFirstKey(FirstStreamIME message) {
+ return (Integer) message.getKey();
}
@Override
- public Integer getSecondKey(MessageEnvelope<Integer, Integer> message) {
- return message.getKey();
+ public Integer getSecondKey(SecondStreamIME message) {
+ return (Integer) message.getKey();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 8a2dd95..e815b81 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,25 +18,19 @@
*/
package org.apache.samza.operators;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -45,6 +39,15 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -138,7 +141,6 @@ public class TestMessageStreamImpl {
OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
assertTrue(sinkOp instanceof SinkOperatorSpec);
assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
- assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
}
@Test
@@ -220,8 +222,8 @@ public class TestMessageStreamImpl {
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
inputStream.partitionBy(keyExtractorFunc);
- assertTrue(streamGraph.getInStreams().size() == 1);
- assertTrue(streamGraph.getOutStreams().size() == 1);
+ assertTrue(streamGraph.getInputStreams().size() == 1);
+ assertTrue(streamGraph.getOutputStreams().size() == 1);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
@@ -229,11 +231,7 @@ public class TestMessageStreamImpl {
assertTrue(partitionByOp instanceof SinkOperatorSpec);
assertNull(partitionByOp.getNextStream());
- ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000), new MessageCollector() {
- @Override
- public void send(OutgoingMessageEnvelope envelope) {
- assertTrue(envelope.getPartitionKey().equals("222"));
- }
- }, null);
+ ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
+ envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
new file mode 100644
index 0000000..6603137
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.testUtils.TestClock;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+ private final MessageCollector messageCollector = mock(MessageCollector.class);
+ private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+ private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
+ private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+ private Config config;
+ private TaskContext taskContext;
+ private ApplicationRunner runner;
+
+ @Before
+ public void setup() throws Exception {
+ windowPanes.clear();
+
+ config = mock(Config.class);
+ taskContext = mock(TaskContext.class);
+ runner = mock(ApplicationRunner.class);
+ when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+ when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
+ }
+
+ @Test
+ public void testTumblingWindowsDiscardingMode() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 5);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+ Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
+ }
+
+ @Test
+ public void testTumblingWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 7);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testSessionWindowsDiscardingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+ }
+
+ @Test
+ public void testSessionWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testCancellationOfOnceTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.count(2));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+ task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
+
+ }
+
+ @Test
+ public void testCancellationOfAnyTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
+
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+ //advance timer by > 500 millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(900));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+ }
+
+ @Test
+ public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofMillis(500));
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 3);
+
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 4);
+ }
+
+ private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+ KeyedTumblingWindowStreamApplication(AccumulationMode mode,
+ Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+ this.mode = mode;
+ this.duration = timeDuration;
+ this.earlyTrigger = earlyTrigger;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+ (k, m) -> new MessageEnvelope(k, m));
+ Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode))
+ .map(m -> {
+ windowPanes.add(m);
+ return m;
+ });
+ }
+ }
+
+ private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+
+ KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+ this.mode = mode;
+ this.duration = duration;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+ (k, m) -> new MessageEnvelope(k, m));
+ Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedSessionWindow(keyFn, duration)
+ .setAccumulationMode(mode))
+ .map(m -> {
+ windowPanes.add(m);
+ return m;
+ });
+ }
+ }
+
+ private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+ IntegerMessageEnvelope(int key, int msg) {
+ super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
+ }
+ }
+
+ private class MessageEnvelope<K, V> {
+ private final K key;
+ private final V value;
+
+ MessageEnvelope(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
deleted file mode 100644
index 9a425d1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input {@link MessageEnvelope} w/ Json message and string as the key.
- */
-
-public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
-
- private final String key;
- private final T data;
- private final Offset offset;
- private final SystemStreamPartition partition;
-
- public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
- this.key = key;
- this.data = data;
- this.offset = offset;
- this.partition = partition;
- }
-
- @Override
- public T getMessage() {
- return this.data;
- }
-
- @Override
- public String getKey() {
- return this.key;
- }
-
- public Offset getOffset() {
- return this.offset;
- }
-
- public SystemStreamPartition getSystemStreamPartition() {
- return this.partition;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
new file mode 100644
index 0000000..2524c28
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestMessageEnvelope {
+
+ private final String key;
+ private final MessageType value;
+
+ public TestMessageEnvelope(String key, String value, long eventTime) {
+ this.key = key;
+ this.value = new MessageType(value, eventTime);
+ }
+
+ public MessageType getMessage() {
+ return this.value;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ public class MessageType {
+ private final String value;
+ private final long eventTime;
+
+ public MessageType(String value, long eventTime) {
+ this.value = value;
+ this.eventTime = eventTime;
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..f9537a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestOutputMessageEnvelope {
+ private final String key;
+ private final Integer value;
+
+ public TestOutputMessageEnvelope(String key, Integer value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public Integer getMessage() {
+ return this.value;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 5722dbd..f978c3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,8 +18,8 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.hamcrest.core.IsEqual;
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 31f6f4a..267cdfc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,24 +18,21 @@
*/
package org.apache.samza.operators.impl;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
@@ -44,6 +41,8 @@ import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
@@ -66,11 +65,11 @@ public class TestOperatorImpls {
nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
nextOperatorsField.setAccessible(true);
- createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+ createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
OperatorSpec.class, Config.class, TaskContext.class);
createOpMethod.setAccessible(true);
- createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+ createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
createOpsMethod.setAccessible(true);
}
@@ -84,8 +83,8 @@ public class TestOperatorImpls {
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
- OperatorGraph opGraph = new OperatorGraph();
- OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+ OperatorImplGraph opGraph = new OperatorImplGraph();
+ OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
assertTrue(opImpl instanceof WindowOperatorImpl);
Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
@@ -97,7 +96,7 @@ public class TestOperatorImpls {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
assertTrue(opImpl instanceof StreamOperatorImpl);
Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
txfmFnField.setAccessible(true);
@@ -107,7 +106,7 @@ public class TestOperatorImpls {
SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
assertTrue(opImpl instanceof SinkOperatorImpl);
Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
sinkFnField.setAccessible(true);
@@ -116,7 +115,7 @@ public class TestOperatorImpls {
// get join operator
PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
assertTrue(opImpl instanceof PartialJoinOperatorImpl);
}
@@ -126,7 +125,7 @@ public class TestOperatorImpls {
MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
TaskContext mockContext = mock(TaskContext.class);
Config mockConfig = mock(Config.class);
- OperatorGraph opGraph = new OperatorGraph();
+ OperatorImplGraph opGraph = new OperatorImplGraph();
RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
assertTrue(operatorChain != null);
}
@@ -139,7 +138,7 @@ public class TestOperatorImpls {
TaskContext mockContext = mock(TaskContext.class);
Config mockConfig = mock(Config.class);
testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
- OperatorGraph opGraph = new OperatorGraph();
+ OperatorImplGraph opGraph = new OperatorImplGraph();
RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 1);
@@ -160,7 +159,7 @@ public class TestOperatorImpls {
Config mockConfig = mock(Config.class);
testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- OperatorGraph opGraph = new OperatorGraph();
+ OperatorImplGraph opGraph = new OperatorImplGraph();
RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 2);
@@ -208,7 +207,7 @@ public class TestOperatorImpls {
}
}, Duration.ofMinutes(1))
.map(m -> m);
- OperatorGraph opGraph = new OperatorGraph();
+ OperatorImplGraph opGraph = new OperatorImplGraph();
// now, we create chained operators from each input sources
RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ce9fdd2..abd7740 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -19,7 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 0a873fd..9dd161a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Collection;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ec1d74c..37e3d1a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -20,16 +20,16 @@ package org.apache.samza.operators.spec;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.stream.OutputStreamInternalImpl;
import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
@@ -41,58 +41,79 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
public class TestOperatorSpecs {
@Test
- public void testGetStreamOperator() {
- FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+ public void testCreateStreamOperator() {
+ FlatMapFunction<?, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
} };
MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
- assertEquals(strmOp.getTransformFn(), transformFn);
- assertEquals(strmOp.getNextStream(), mockOutput);
+ StreamOperatorSpec<?, TestMessageEnvelope> streamOp =
+ OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
+ assertEquals(streamOp.getTransformFn(), transformFn);
+ assertEquals(streamOp.getNextStream(), mockOutput);
}
@Test
- public void testGetSinkOperator() {
+ public void testCreateSinkOperator() {
SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
TaskCoordinator taskCoordinator) -> { };
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
assertEquals(sinkOp.getSinkFn(), sinkFn);
- assertTrue(sinkOp.getNextStream() == null);
+ assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK);
+ assertEquals(sinkOp.getNextStream(), null);
+ }
+
+ @Test
+ public void testCreateSendToOperator() {
+ OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1);
+ assertNotNull(sinkOp.getSinkFn());
+ assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO);
+ assertEquals(sinkOp.getNextStream(), null);
+ }
+
+
+ @Test
+ public void testCreatePartitionByOperator() {
+ OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1);
+ assertNotNull(sinkOp.getSinkFn());
+ assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY);
+ assertEquals(sinkOp.getNextStream(), null);
}
@Test
- public void testGetWindowOperator() throws Exception {
+ public void testCreateWindowOperator() throws Exception {
Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
Supplier<Integer> initialValue = () -> 0;
//instantiate a window using reflection
WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
- WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
+ WindowOperatorSpec spec =
+ OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1);
assertEquals(spec.getWindow(), window);
assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
}
@Test
- public void testGetPartialJoinOperator() {
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn = mock(PartialJoinFunction.class);
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn = mock(PartialJoinFunction.class);
+ public void testCreatePartialJoinOperator() {
+ PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn
+ = mock(PartialJoinFunction.class);
+ PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn
+ = mock(PartialJoinFunction.class);
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec =
- OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, mockGraph, joinOutput);
+ PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec
+ = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1);
assertEquals(partialJoinSpec.getNextStream(), joinOutput);
assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn);
@@ -100,13 +121,15 @@ public class TestOperatorSpecs {
}
@Test
- public void testGetMergeOperator() {
+ public void testCreateMergeOperator() {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
- Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
- this.add(t);
- } };
+ StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp =
+ OperatorSpecs.createMergeOperatorSpec(output, 1);
+ Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn =
+ t -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(t);
+ } };
TestMessageEnvelope t = mock(TestMessageEnvelope.class);
assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
assertEquals(mergeOp.getNextStream(), output);
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
deleted file mode 100644
index 674a8f1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.triggers;
-
-import org.apache.samza.util.Clock;
-
-import java.time.Duration;
-
-/**
- * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
- * Used for testing.
- */
-public class TestClock implements Clock {
-
- long currentTime = 1;
-
- public void advanceTime(Duration duration) {
- currentTime += duration.toMillis();
- }
-
- public void advanceTime(long millis) {
- currentTime += millis;
- }
-
- @Override
- public long currentTimeMillis() {
- return currentTime;
- }
-}