You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/09 10:22:44 UTC
[1/4] samza git commit: SAMZA-1073: top-level fluent API `
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 [created] 373048aa0
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
new file mode 100644
index 0000000..5f659ba
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestFluentStreamTasks {
+
+ private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+ for (int i = 0; i < 4; i++) {
+ this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+ }
+ } };
+
+ @Test
+ public void testUserTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ WindowGraph userTask = new WindowGraph(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testSplitTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ BroadcastGraph splitTask = new BroadcastGraph(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testJoinTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ JoinGraph joinTask = new JoinGraph(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
new file mode 100644
index 0000000..a365411
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowGraph implements StreamGraphFactory {
+ class MessageType {
+ String field1;
+ String field2;
+ }
+
+ private final Set<SystemStreamPartition> inputs;
+
+ WindowGraph(Set<SystemStreamPartition> inputs) {
+ this.inputs = inputs;
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public StreamGraph create(Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+ BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+ inputs.forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return source.getSystemStream();
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).
+ map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
+ m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+
+ return graph;
+ }
+
+ String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+ return m.getKey().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
deleted file mode 100644
index 663d98c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastTask implements StreamOperatorTask {
- class MessageType {
- String field1;
- String field2;
- String field3;
- String field4;
- String parKey;
- private long timestamp;
-
- public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
- messageStreams.values().forEach(entry -> {
- MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter1)
- .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
- });
- }
-
- JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) {
- return (JsonMessageEnvelope) m1.getMessage();
- }
-
- boolean myFilter1(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key1");
- }
-
- boolean myFilter2(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key2");
- }
-
- boolean myFilter3(JsonMessageEnvelope m1) {
- return m1.getMessage().parKey.equals("key3");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
deleted file mode 100644
index 1b10609..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class JoinTask implements StreamOperatorTask {
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- MessageStream<JsonMessageEnvelope> joinOutput = null;
-
- @Override
- public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- messageStreams.values().forEach(messageStream -> {
- MessageStream<JsonMessageEnvelope> newSource = messageStream.map(this::getInputMessage);
- if (joinOutput == null) {
- joinOutput = newSource;
- } else {
- joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2));
- }
- });
- }
-
- private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
deleted file mode 100644
index 61bb32a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestFluentStreamAdaptorTask {
- Field userTaskField = null;
- Field operatorChainsField = null;
-
- @Before
- public void prep() throws NoSuchFieldException {
- userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
- operatorChainsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- userTaskField.setAccessible(true);
- operatorChainsField.setAccessible(true);
- }
-
- @Test
- public void testConstructor() throws IllegalAccessException {
- StreamOperatorTask userTask = mock(StreamOperatorTask.class);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
- StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask);
- Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
- assertEquals(taskMemberVar, userTask);
- assertTrue(chainsMap.isEmpty());
- }
-
- @Test
- public void testInit() throws Exception {
- StreamOperatorTask userTask = mock(StreamOperatorTask.class);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- Set<SystemStreamPartition> testInputs = new HashSet() { {
- this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0)));
- this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1)));
- } };
- when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
- adaptorTask.init(mockConfig, mockContext);
- verify(userTask, times(1)).transform(Mockito.anyMap());
- Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
- assertTrue(chainsMap.size() == 2);
- assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
- assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
- }
-
- // TODO: window and process methods to be added after implementation of ChainedOperators.create()
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
deleted file mode 100644
index d804bf8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestFluentStreamTasks {
-
- private final WindowTask userTask = new WindowTask();
-
- private final BroadcastTask splitTask = new BroadcastTask();
-
- private final JoinTask joinTask = new JoinTask();
-
- private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
- for (int i = 0; i < 4; i++) {
- this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i)));
- }
- } };
-
- @Test
- public void testUserTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
- @Test
- public void testSplitTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
- @Test
- public void testJoinTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask);
- Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
- pipelineMapFld.setAccessible(true);
- Map<SystemStreamPartition, OperatorImpl> pipelineMap =
- (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- assertEquals(pipelineMap.size(), 4);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(pipelineMap.get(partition));
- });
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 5991e2f..d5607d8 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
@@ -29,6 +30,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -46,17 +48,19 @@ import static org.mockito.Mockito.when;
public class TestMessageStreamImpl {
+ private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+
@Test
public void testMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap =
- m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
+ new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
assertTrue(mapOp instanceof StreamOperatorSpec);
- assertEquals(mapOp.getOutputStream(), outputStream);
+ assertEquals(mapOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
@@ -73,33 +77,33 @@ public class TestMessageStreamImpl {
@Test
public void testFlatMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
this.add(mock(TestOutputMessageEnvelope.class));
this.add(mock(TestOutputMessageEnvelope.class));
this.add(mock(TestOutputMessageEnvelope.class));
} };
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts;
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getOutputStream(), outputStream);
+ assertEquals(flatMapOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
}
@Test
public void testFilter() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L;
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
assertTrue(filterOp instanceof StreamOperatorSpec);
- assertEquals(filterOp.getOutputStream(), outputStream);
+ assertEquals(filterOp.getNextStream(), outputStream);
// assert that the transformation function is what we defined above
FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
@@ -117,8 +121,8 @@ public class TestMessageStreamImpl {
@Test
public void testSink() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
- SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
};
@@ -128,26 +132,28 @@ public class TestMessageStreamImpl {
OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
assertTrue(sinkOp instanceof SinkOperatorSpec);
assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
- assertNull(((SinkOperatorSpec) sinkOp).getOutputStream());
+ assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
}
@Test
public void testJoin() {
- MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>();
- MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>();
- JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
- (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
+ JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+ (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>)
+ (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+
MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput);
+ assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
subs = source2.getRegisteredOperatorSpecs();
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput);
+ assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
@@ -160,10 +166,10 @@ public class TestMessageStreamImpl {
@Test
public void testMerge() {
- MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>();
+ MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
- this.add(new MessageStreamImpl<>());
- this.add(new MessageStreamImpl<>());
+ this.add(new MessageStreamImpl<>(mockGraph));
+ this.add(new MessageStreamImpl<>(mockGraph));
} };
MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
validateMergeOperator(merge1, mergeOutput);
@@ -176,7 +182,7 @@ public class TestMessageStreamImpl {
assertEquals(subs.size(), 1);
OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
assertTrue(mergeOp instanceof StreamOperatorSpec);
- assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput);
+ assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
assertEquals(outputs.size(), 1);
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
new file mode 100644
index 0000000..c4e9f51
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+public class TestMessageStreamImplUtil {
+ public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
+ return new MessageStreamImpl<M>(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
deleted file mode 100644
index e176063..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class WindowTask implements StreamOperatorTask {
- class MessageType {
- String field1;
- String field2;
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
- messageStreams.values().forEach(source ->
- source.map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), m1.getSystemStreamPartition()))
- .window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))
- );
- }
-
- String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
- return m.getKey().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index bae98e3..ec63d41 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,17 +18,26 @@
*/
package org.apache.samza.operators.impl;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.task.TaskContext;
import org.junit.Before;
import org.junit.Test;
@@ -38,7 +47,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -49,25 +57,46 @@ import static org.mockito.Mockito.when;
public class TestOperatorImpls {
Field nextOperatorsField = null;
+ Method createOpMethod = null;
+ Method createOpsMethod = null;
@Before
- public void prep() throws NoSuchFieldException {
+ public void prep() throws NoSuchFieldException, NoSuchMethodException {
nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
nextOperatorsField.setAccessible(true);
+
+ createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+ OperatorSpec.class, Config.class, TaskContext.class);
+ createOpMethod.setAccessible(true);
+
+ createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+ createOpsMethod.setAccessible(true);
}
-
+
@Test
- public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException {
+ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
// get window operator
WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd);
+ WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+ when(mockWnd.getWindow()).thenReturn(windowInternal);
+ MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+
+ OperatorGraph opGraph = new OperatorGraph();
+ OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+ createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
assertTrue(opImpl instanceof WindowOperatorImpl);
+ Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
+ wndInternalField.setAccessible(true);
+ WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
+ assertEquals(wndInternal, windowInternal);
// get simple operator
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
assertTrue(opImpl instanceof StreamOperatorImpl);
Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
txfmFnField.setAccessible(true);
@@ -77,7 +106,7 @@ public class TestOperatorImpls {
SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = OperatorImpls.createOperatorImpl(sinkOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
assertTrue(opImpl instanceof SinkOperatorImpl);
Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
sinkFnField.setAccessible(true);
@@ -86,28 +115,33 @@ public class TestOperatorImpls {
// get join operator
PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
- BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput;
+ PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
when(joinOp.getTransformFn()).thenReturn(joinFn);
- opImpl = OperatorImpls.createOperatorImpl(joinOp);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
assertTrue(opImpl instanceof PartialJoinOperatorImpl);
}
@Test
- public void testEmptyChain() {
+ public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
// test creation of empty chain
- MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
TaskContext mockContext = mock(TaskContext.class);
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext);
+ Config mockConfig = mock(Config.class);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
assertTrue(operatorChain != null);
}
@Test
- public void testLinearChain() throws IllegalAccessException {
+ public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
// test creation of linear chain
- MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
- testInput.map(m -> m).window(Windows.tumblingWindow(Duration.ofMillis(1000)));
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ Config mockConfig = mock(Config.class);
+ testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 1);
OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
@@ -119,13 +153,16 @@ public class TestOperatorImpls {
}
@Test
- public void testBroadcastChain() throws IllegalAccessException {
+ public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
// test creation of broadcast chain
- MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
assertEquals(subsSet.size(), 2);
Iterator<OperatorImpl> iter = subsSet.iterator();
@@ -146,18 +183,23 @@ public class TestOperatorImpls {
}
@Test
- public void testJoinChain() throws IllegalAccessException {
+ public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
// test creation of join chain
- MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>();
- MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>();
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
input1
- .join(input2, (m1, m2) ->
- new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()))
+ .join(input2,
+ (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) (m1, m2) ->
+ new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length())
+ )
.map(m -> m);
+ OperatorGraph opGraph = new OperatorGraph();
// now, we create chained operators from each input sources
- RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext);
- RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext);
+ RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
+ RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
// check that those two chains will merge at map operator
// first branch of the join
Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ba5b6f8..ce9fdd2 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,17 +18,16 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestSinkOperatorImpl {
@@ -38,7 +37,9 @@ public class TestSinkOperatorImpl {
SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 5a3840c..010a210 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,22 +18,20 @@
*/
package org.apache.samza.operators.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.TestMessageEnvelope;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestStreamOperatorImpl {
@@ -43,8 +41,10 @@ public class TestStreamOperatorImpl {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
-
- StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp));
+ MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ffe9df1..31257a4 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -18,14 +18,18 @@
*/
package org.apache.samza.operators.spec;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.WindowKey;
import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
import java.util.ArrayList;
@@ -42,19 +46,23 @@ public class TestOperatorSpecs {
@Test
public void testGetStreamOperator() {
FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
- } };
- StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn);
+ this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+ } };
+ MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
assertEquals(strmOp.getTransformFn(), transformFn);
- assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl);
+ assertEquals(strmOp.getNextStream(), mockOutput);
}
@Test
public void testGetSinkOperator() {
- SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn);
+ SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
+ TaskCoordinator taskCoordinator) -> { };
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
assertEquals(sinkOp.getSinkFn(), sinkFn);
- assertTrue(sinkOp.getOutputStream() == null);
+ assertTrue(sinkOp.getNextStream() == null);
}
@Test
@@ -65,8 +73,9 @@ public class TestOperatorSpecs {
//instantiate a window using reflection
WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
- WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, WindowKey<String>, Integer,
- WindowPane<WindowKey<String>, Integer>>createWindowOperatorSpec(window);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
+ WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
assertEquals(spec.getWindow(), window);
assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
assertEquals(spec.getWindow().getFoldFunction(), aggregator);
@@ -74,13 +83,30 @@ public class TestOperatorSpecs {
@Test
public void testGetPartialJoinOperator() {
- BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
- (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
- MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>();
+ PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+ new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
+ @Override
+ public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
+ return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+ }
+
+ @Override
+ public Object getKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+
+ @Override
+ public Object getOtherKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+ };
+
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
- OperatorSpecs.createPartialJoinOperatorSpec(merger, joinOutput);
+ OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
- assertEquals(partialJoin.getOutputStream(), joinOutput);
+ assertEquals(partialJoin.getNextStream(), joinOutput);
MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
assertEquals(partialJoin.getTransformFn(), merger);
@@ -88,13 +114,14 @@ public class TestOperatorSpecs {
@Test
public void testGetMergeOperator() {
- MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>();
- StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(output);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
this.add(t);
} };
TestMessageEnvelope t = mock(TestMessageEnvelope.class);
assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
- assertEquals(mergeOp.getOutputStream(), output);
+ assertEquals(mergeOp.getNextStream(), output);
}
}
[3/4] samza git commit: SAMZA-1073: top-level fluent API `
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index f7e1f36..73fb5c8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -20,7 +20,6 @@
package org.apache.samza.operators.windows;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.triggers.TimeTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
@@ -34,11 +33,11 @@ import java.util.function.Function;
/**
* APIs for creating different types of {@link Window}s.
*
- * Groups the incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
+ * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
*
* <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
* that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
- * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
+ * messages in the window and is called a {@link WindowPane}.
*
* <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
* has arrived or late triggers that allow handling of late data arrivals.
@@ -74,14 +73,14 @@ import java.util.function.Function;
* <li>
* Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
* A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
- * The boundary for a session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within
+ * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
* the gap are grouped into the same session.
* <li>
* Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
* An early trigger must be specified when defining a global window.
* </ul>
*
- * <p> A {@link Window} is defined as "keyed" when the incoming {@link MessageEnvelope}s are first grouped based on their key
+ * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
* and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
* types.
*
@@ -92,7 +91,7 @@ public final class Windows {
private Windows() { }
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping processing
+ * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing
* time based windows based on the provided keyFn and applies the provided fold function to them.
*
* <p>The below example computes the maximum value per-key over fixed size 10 second windows.
@@ -101,29 +100,29 @@ public final class Windows {
* MessageStream<UserClick> stream = ...;
* Function<UserClick, String> keyFn = ...;
* BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
- * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
- * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
+ * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
+ * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
* }
* </pre>
*
- * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+ * @param keyFn the function to extract the window key from a message
* @param interval the duration in processing time
- * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function.
*/
- public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>>
+ public static <M, K, WV> Window<M, K, WV>
keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) {
- Trigger defaultTrigger = new TimeTrigger(interval);
+ Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
}
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping
+ * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
* processing time based windows using the provided keyFn.
*
* <p>The below example groups the stream into fixed-size 10 second windows for each key.
@@ -131,19 +130,18 @@ public final class Windows {
* <pre> {@code
* MessageStream<UserClick> stream = ...;
* Function<UserClick, String> keyFn = ...;
- * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(
- * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
+ * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
+ * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
* }
* </pre>
*
- * @param keyFn function to extract key from the {@link MessageEnvelope}
+ * @param keyFn function to extract key from the message
* @param interval the duration in processing time
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param <M> the type of the input message
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>>
- keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
+ public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
c.add(m);
return c;
@@ -160,25 +158,25 @@ public final class Windows {
* <pre> {@code
* MessageStream<String> stream = ...;
* BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
- * MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = stream.window(
- * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
+ * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
+ * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
* }
* </pre>
*
* @param duration the duration in processing time
- * @param foldFn to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param foldFn to aggregate messages in the {@link WindowPane}
+ * @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>>
+ public static <M, WV> Window<M, Void, WV>
tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) {
- Trigger defaultTrigger = Triggers.repeat(new TimeTrigger(duration));
- return new WindowInternal<M, Void, WV>(defaultTrigger, foldFn, null, null);
+ Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
+ return new WindowInternal<>(defaultTrigger, foldFn, null, null);
}
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping
+ * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
* processing time based windows.
*
* <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
@@ -187,16 +185,16 @@ public final class Windows {
* MessageStream<Long> stream = ...;
* Function<Collection<Long, Long>> percentile99 = ..
*
- * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
+ * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
* MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
* }
* </pre>
*
* @param duration the duration in processing time
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param <M> the type of the input message
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> tumblingWindow(Duration duration) {
+ public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
c.add(m);
return c;
@@ -205,11 +203,11 @@ public final class Windows {
}
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}
+ * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
* and applies the provided fold function to them.
*
* <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
- * A session is considered complete when no new messages arrive within the {@code sessionGap}. All {@link MessageEnvelope}s that arrive within
+ * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
* the gap are grouped into the same session.
*
* <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
@@ -218,29 +216,29 @@ public final class Windows {
* MessageStream<UserClick> stream = ...;
* BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
- * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
- * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
+ * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
+ * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
* }
* </pre>
*
- * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+ * @param keyFn the function to extract the window key from a message
* @param sessionGap the timeout gap for defining the session
- * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param <M> the type of the input message
* @param <K> the type of the key in the {@link Window}
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
- Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
- return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
+ public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
+ Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
+ return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null);
}
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}.
+ * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
*
* <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
- * boundary for the session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within
+ * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
* the gap are grouped into the same session.
*
* <p>The below example groups the stream into per-key session windows of gap 10 seconds.
@@ -249,18 +247,18 @@ public final class Windows {
* MessageStream<UserClick> stream = ...;
* BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
* Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
- * MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> windowedStream = stream.window(
- * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
+ * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
+ * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
* }
* </pre>
*
- * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+ * @param keyFn the function to extract the window key from a message}
* @param sessionGap the timeout gap for defining the session
- * @param <M> the type of the input {@link MessageEnvelope}
+ * @param <M> the type of the input message
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
+ public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
c.add(m);
@@ -271,7 +269,7 @@ public final class Windows {
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a
+ * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
* default trigger. The triggering behavior must be specified by setting an early trigger.
*
* <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when
@@ -280,36 +278,36 @@ public final class Windows {
* <pre> {@code
* MessageStream<Long> stream = ...;
* BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
- * MessageStream<WindowOutput<WindowKey, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
+ * MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
* .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
* }
* </pre>
*
- * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
- * @param <M> the type of {@link MessageEnvelope}
+ * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param <M> the type of message
* @param <WV> type of the output value in the {@link WindowPane}
* @return the created {@link Window} function.
*/
- public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> globalWindow(BiFunction<M, WV, WV> foldFn) {
- return new WindowInternal<M, Void, WV>(null, foldFn, null, null);
+ public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) {
+ return new WindowInternal<>(null, foldFn, null, null);
}
/**
- * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a
+ * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
* default trigger. The triggering behavior must be specified by setting an early trigger.
*
* The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes.
* <pre> {@code
* MessageStream<Long> stream = ...;
- * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = stream.window(Windows.globalWindow()
+ * MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow()
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
* }
* </pre>
*
- * @param <M> the type of {@link MessageEnvelope}
+ * @param <M> the type of message
* @return the created {@link Window} function.
*/
- public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> globalWindow() {
+ public static <M> Window<M, Void, Collection<M>> globalWindow() {
BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
c.add(m);
return c;
@@ -318,7 +316,7 @@ public final class Windows {
}
/**
- * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn.
+ * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
* The window does not have a default trigger. The triggering behavior must be specified by setting an early
* trigger.
*
@@ -329,24 +327,24 @@ public final class Windows {
* MessageStream<UserClick> stream = ...;
* BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
* Function<UserClick, String> keyFn = ...;
- * MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
+ * MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
* }
* </pre>
*
- * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
- * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
- * @param <M> the type of {@link MessageEnvelope}
+ * @param keyFn the function to extract the window key from a message
+ * @param foldFn the function to aggregate messages in the {@link WindowPane}
+ * @param <M> the type of message
* @param <K> type of the key in the {@link Window}
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
+ public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null);
}
/**
- * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn.
+ * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
* The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger.
*
* <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or
@@ -355,17 +353,17 @@ public final class Windows {
* <pre> {@code
* MessageStream<UserClick> stream = ...;
* Function<UserClick, String> keyFn = ...;
- * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
+ * MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
* }
* </pre>
*
- * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
- * @param <M> the type of {@link MessageEnvelope}
+ * @param keyFn the function to extract the window key from a message
+ * @param <M> the type of message
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function
*/
- public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedGlobalWindow(Function<M, K> keyFn) {
+ public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) {
BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
c.add(m);
return c;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index 8825867..9479eea 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,11 +18,9 @@
*/
package org.apache.samza.operators.windows.internal;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowPane;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -32,9 +30,13 @@ import java.util.function.Function;
* and whether to accumulate or discard previously emitted panes.
*
* Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers.
+ *
+ * @param <M> the type of input message
+ * @param <K> the type of key for the window
+ * @param <WV> the type of aggregated value in the window output
*/
@InterfaceStability.Unstable
-public final class WindowInternal<M extends MessageEnvelope, K, WV> implements Window<M, K, WV, WindowPane<K, WV>> {
+public final class WindowInternal<M, K, WV> implements Window<M, K, WV> {
private final Trigger defaultTrigger;
@@ -67,19 +69,19 @@ public final class WindowInternal<M extends MessageEnvelope, K, WV> implements W
}
@Override
- public Window<M, K, WV, WindowPane<K, WV>> setEarlyTrigger(Trigger trigger) {
+ public Window<M, K, WV> setEarlyTrigger(Trigger trigger) {
this.earlyTrigger = trigger;
return this;
}
@Override
- public Window<M, K, WV, WindowPane<K, WV>> setLateTrigger(Trigger trigger) {
+ public Window<M, K, WV> setLateTrigger(Trigger trigger) {
this.lateTrigger = trigger;
return this;
}
@Override
- public Window<M, K, WV, WindowPane<K, WV>> setAccumulationMode(AccumulationMode mode) {
+ public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) {
this.mode = mode;
return this;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
new file mode 100644
index 0000000..d0c5985
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ */
+@InterfaceStability.Unstable
+public interface ExecutionEnvironment {
+
+ String ENVIRONMENT_CONFIG = "job.execution.environment.class";
+ String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
+
+ /**
+ * Static method to load the local standalone environment
+ *
+ * @param config configuration passed in to initialize the Samza standalone process
+ * @return the standalone {@link ExecutionEnvironment} to run the user-defined stream applications
+ */
+ static ExecutionEnvironment getLocalEnvironment(Config config) {
+ return null;
+ }
+
+ /**
+ * Static method to load the non-standalone environment.
+ *
+ * @param config configuration passed in to initialize the Samza processes
+ * @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
+ */
+ static ExecutionEnvironment fromConfig(Config config) {
+ try {
+ if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
+ return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
+ }
+ } catch (Exception e) {
+ throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
+ }
+ throw new ConfigException(String.format(
+ "Class %s does not implement interface ExecutionEnvironment properly",
+ config.get(ENVIRONMENT_CONFIG)));
+ }
+
+ /**
+ * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
+ *
+ * @param graphFactory the user-defined {@link StreamGraphFactory} object
+ * @param config the {@link Config} object for this job
+ */
+ void run(StreamGraphFactory graphFactory, Config config);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index a85e0b4..5779071 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -54,4 +54,14 @@ public interface TaskContext {
*
*/
void setStartingOffset(SystemStreamPartition ssp, String offset);
+
+ /**
+ * Method to allow user to return customized context
+ *
+ * @param <T> the type of user-defined task context
+ * @return user-defined task context object
+ */
+ default <T extends TaskContext> T getUserDefinedContext() {
+ return null;
+ };
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
index 7bd62a7..e3a1290 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -33,7 +33,7 @@ public class TestIncomingSystemMessage {
@Test
public void testConstructor() {
IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
- IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime);
+ InputMessageEnvelope ism = new InputMessageEnvelope(ime);
Object mockKey = mock(Object.class);
Object mockValue = mock(Object.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
deleted file mode 100644
index 9679e1d..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
- @Test
- public void testConstructor() {
- WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10);
- assertEquals(wndOutput.getKey().getKey(), "testMsg");
- assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
- assertFalse(wndOutput.isDelete());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
new file mode 100644
index 0000000..809c5b4
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestWindowPane {
+ @Test
+ public void testConstructor() {
+ WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10);
+ assertEquals(wndOutput.getKey().getKey(), "testMsg");
+ assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 286893c..d85d488 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,107 +19,161 @@
package org.apache.samza.operators;
-import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
+import org.apache.samza.task.TaskContext;
/**
* The implementation for input/output {@link MessageStream}s to/from the operators.
* Users use the {@link MessageStream} API methods to describe and chain the operators specs.
*
- * @param <M> type of {@link MessageEnvelope}s in this {@link MessageStream}
+ * @param <M> type of messages in this {@link MessageStream}
*/
-public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> {
+public class MessageStreamImpl<M> implements MessageStream<M> {
+ /**
+ * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl}
+ */
+ private final StreamGraphImpl graph;
/**
- * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream}
+ * The set of operators that consume the messages in this {@link MessageStream}
*/
private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
- @Override
- public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) {
- OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperatorSpec(m -> new ArrayList<OM>() { {
- OM r = mapFn.apply(m);
- if (r != null) {
- this.add(r);
- }
- } });
+ /**
+ * Default constructor
+ *
+ * @param graph the {@link StreamGraphImpl} object that this stream belongs to
+ */
+ MessageStreamImpl(StreamGraphImpl graph) {
+ this.graph = graph;
+ }
+
+ @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
+ OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph));
this.registeredOperatorSpecs.add(op);
- return op.getOutputStream();
+ return op.getNextStream();
}
- @Override
- public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) {
- OperatorSpec<OM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn);
+ @Override public MessageStream<M> filter(FilterFunction<M> filterFn) {
+ OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph));
this.registeredOperatorSpecs.add(op);
- return op.getOutputStream();
+ return op.getNextStream();
}
@Override
- public MessageStream<M> filter(FilterFunction<M> filterFn) {
- OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperatorSpec(t -> new ArrayList<M>() { {
- if (filterFn.apply(t)) {
- this.add(t);
- }
- } });
+ public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
+ OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph));
this.registeredOperatorSpecs.add(op);
- return op.getOutputStream();
+ return op.getNextStream();
}
@Override
public void sink(SinkFunction<M> sinkFn) {
- this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn));
+ this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph));
}
- @Override
- public <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(
- Window<M, K, WV, WM> window) {
- OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<MessageEnvelope, K, WV>) window);
- this.registeredOperatorSpecs.add(wndOp);
- return wndOp.getOutputStream();
+ @Override public void sendTo(OutputStream<M> stream) {
+ this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
+ }
+
+ @Override public MessageStream<M> sendThrough(OutputStream<M> stream) {
+ this.sendTo(stream);
+ return this.graph.getIntStream(stream);
}
@Override
- public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(
- MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) {
- MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>();
+ public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
+ OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
+ this.graph, new MessageStreamImpl<>(this.graph));
+ this.registeredOperatorSpecs.add(wndOp);
+ return wndOp.getNextStream();
+ }
- BiFunction<M, JM, RM> parJoin1 = joinFn::apply;
- BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m);
+ @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) {
+ MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
+
+ PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() {
+ @Override
+ public RM apply(M m1, OM om) {
+ return joinFn.apply(m1, om);
+ }
+
+ @Override
+ public K getKey(M message) {
+ return joinFn.getFirstKey(message);
+ }
+
+ @Override
+ public K getOtherKey(OM message) {
+ return joinFn.getSecondKey(message);
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ joinFn.init(config, context);
+ }
+ };
+
+ PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() {
+ @Override
+ public RM apply(OM m1, M m) {
+ return joinFn.apply(m, m1);
+ }
+
+ @Override
+ public K getKey(OM message) {
+ return joinFn.getSecondKey(message);
+ }
+
+ @Override
+ public K getOtherKey(M message) {
+ return joinFn.getFirstKey(message);
+ }
+ };
// TODO: need to add default store functions for the two partial join functions
- ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin2, outputStream));
- this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin1, outputStream));
+ ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(
+ OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream));
+ this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream));
return outputStream;
}
@Override
public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
- MessageStreamImpl<M> outputStream = new MessageStreamImpl<>();
+ MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
otherStreams.add(this);
- otherStreams.forEach(other ->
- ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(outputStream)));
+ otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
+ add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
return outputStream;
}
+ @Override
+ public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
+ MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+ OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
+ this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
+ this.graph, outputStream));
+ return intStream;
+ }
/**
* Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
* should not be exposed to users.
@@ -129,4 +183,5 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre
public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
return Collections.unmodifiableSet(this.registeredOperatorSpecs);
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
new file mode 100644
index 0000000..dca3469
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
+ * create system input/output/intermediate streams.
+ */
+public class StreamGraphImpl implements StreamGraph {
+
+ /**
+ * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope}
+ * in the input {@link MessageStream}s.
+ */
+ private int opId = 0;
+
+ private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
+ final StreamSpec spec;
+ final Serde<K> keySerde;
+ final Serde<V> msgSerde;
+
+ InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ super(graph);
+ this.spec = streamSpec;
+ this.keySerde = keySerde;
+ this.msgSerde = msgSerde;
+ }
+
+ StreamSpec getSpec() {
+ return this.spec;
+ }
+
+ }
+
+ private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> {
+ final StreamSpec spec;
+ final Serde<K> keySerde;
+ final Serde<V> msgSerde;
+
+ OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ this.spec = streamSpec;
+ this.keySerde = keySerde;
+ this.msgSerde = msgSerde;
+ }
+
+ StreamSpec getSpec() {
+ return this.spec;
+ }
+
+ @Override
+ public SinkFunction<M> getSinkFunction() {
+ return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+ // TODO: need to find a way to directly pass in the serde class names
+ // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+ // message.getKey(), message.getKey(), message.getMessage()));
+ mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+ };
+ }
+ }
+
+ private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
+ final Function<M, PK> parKeyFn;
+
+ /**
+ * Default constructor
+ *
+ * @param graph the {@link StreamGraphImpl} object that this stream belongs to
+ */
+ IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ this(graph, streamSpec, keySerde, msgSerde, null);
+ }
+
+ IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
+ super(graph, streamSpec, keySerde, msgSerde);
+ this.parKeyFn = parKeyFn;
+ }
+
+ @Override
+ public SinkFunction<M> getSinkFunction() {
+ return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+ // TODO: need to find a way to directly pass in the serde class names
+ // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+ // message.getKey(), message.getKey(), message.getMessage()));
+ if (this.parKeyFn == null) {
+ mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+ } else {
+ // apply partition key function
+ mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+ }
+ };
+ }
+ }
+
+ /**
+ * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
+ */
+ private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
+ private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+
+ private ContextManager contextManager = new ContextManager() { };
+
+ @Override
+ public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+ this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+ }
+ return this.inStreams.get(streamSpec.getSystemStream());
+ }
+
+ /**
+ * Helper method to be used by {@link MessageStreamImpl} class
+ *
+ * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as the output
+ * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream}
+ * @return the {@link MessageStreamImpl} object
+ */
+ @Override
+ public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+ this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+ }
+ return this.outStreams.get(streamSpec.getSystemStream());
+ }
+
+ /**
+ * Helper method to be used by {@link MessageStreamImpl} class
+ *
+ * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream}
+ * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream}
+ * @return the {@link MessageStreamImpl} object
+ */
+ @Override
+ public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+ if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+ this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+ }
+ IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
+ if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+ this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+ }
+ return intStream;
+ }
+
+ @Override public Map<StreamSpec, MessageStream> getInStreams() {
+ Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
+ this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry));
+ return Collections.unmodifiableMap(inStreamMap);
+ }
+
+ @Override public Map<StreamSpec, OutputStream> getOutStreams() {
+ Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
+ this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+ return Collections.unmodifiableMap(outStreamMap);
+ }
+
+ @Override
+ public StreamGraph withContextManager(ContextManager manager) {
+ this.contextManager = manager;
+ return this;
+ }
+
+ public int getNextOpId() {
+ return this.opId++;
+ }
+
+ public ContextManager getContextManager() {
+ return this.contextManager;
+ }
+
+ /**
+ * Helper method to be get the input stream via {@link SystemStream}
+ *
+ * @param systemStream the {@link SystemStream}
+ * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
+ */
+ public MessageStreamImpl getInputStream(SystemStream systemStream) {
+ if (this.inStreams.containsKey(systemStream)) {
+ return (MessageStreamImpl) this.inStreams.get(systemStream);
+ }
+ return null;
+ }
+
+ <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
+ if (this.outStreams.containsValue(intStream)) {
+ return (OutputStream<M>) intStream;
+ }
+ return null;
+ }
+
+ <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
+ if (this.inStreams.containsValue(outStream)) {
+ return (MessageStream<M>) outStream;
+ }
+ return null;
+ }
+
+ /**
+ * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
+ *
+ * @param parKeyFn the function to extract the partition key from the input message
+ * @param <PK> the type of partition key
+ * @param <M> the type of input message
+ * @return the {@link OutputStream} object for the re-partitioned stream
+ */
+ <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+ // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
+ StreamSpec streamSpec = new StreamSpec() {
+ @Override
+ public SystemStream getSystemStream() {
+ // TODO: should auto-generate intermedaite stream name here
+ return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
+ }
+
+ @Override
+ public Properties getProperties() {
+ return null;
+ }
+ };
+
+ if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+ this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+ }
+ IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
+ if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+ this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+ }
+ return intStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
deleted file mode 100644
index 152cd92..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.operators.impl.OperatorImpls;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the
- * {@link MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link IncomingSystemMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)}
- * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method.
- * <p>
- * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the
- * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the
- * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired
- * transformation, and returns the output {@link MessageStream} to allow further transform chaining.
- * <p>
- * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the
- * {@link StreamOperatorTask#transform(Map)} call returns), it calls
- * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message
- * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along
- * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
- */
-public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask {
-
- /**
- * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG.
- */
- private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>();
-
- private final StreamOperatorTask userTask;
-
- public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
- this.userTask = userTask;
- }
-
- @Override
- public final void init(Config config, TaskContext context) throws Exception {
- if (this.userTask instanceof InitableTask) {
- ((InitableTask) this.userTask).init(config, context);
- }
- Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>();
- context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>()));
- this.userTask.transform(messageStreams);
- messageStreams.forEach((ssp, ms) ->
- operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context)));
- }
-
- @Override
- public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
- this.operatorChains.get(ime.getSystemStreamPartition())
- .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator);
- }
-
- @Override
- public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- if (this.userTask instanceof WindowableTask) {
- ((WindowableTask) this.userTask).window(collector, coordinator);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
new file mode 100644
index 0000000..3583b92
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * This defines the interface function a two-way join functions that takes input messages from two input
+ * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output
+ */
+@InterfaceStability.Unstable
+public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction {
+
+ /**
+ * Method to perform join method on the two input messages
+ *
+ * @param m1 message from the first input stream
+ * @param om message from the second input stream
+ * @return the joined message in the output stream
+ */
+ RM apply(M m1, OM om);
+
+ /**
+ * Method to get the key from the input message
+ *
+ * @param message the input message from the first strean
+ * @return the join key in the {@code message}
+ */
+ K getKey(M message);
+
+ /**
+ * Method to get the key from the input message in the other stream
+ *
+ * @param message the input message from the other stream
+ * @return the join key in the {@code message}
+ */
+ K getOtherKey(OM message);
+
+ /**
+ * Init method to initialize the context for this {@link PartialJoinFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
new file mode 100644
index 0000000..66336f8
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
+ * {@link MessageStreamImpl}
+ */
+public class OperatorGraph {
+
+ /**
+ * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG
+ * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
+ * according to a single instance of {@link OperatorSpec}.
+ */
+ private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
+
+ /**
+ * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages.
+ */
+ private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
+
+ /**
+ * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
+ * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
+ * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input.
+ *
+ * @param inputStreams the map of input {@link org.apache.samza.operators.MessageStream}s
+ * @param config the {@link Config} required to instantiate operators
+ * @param context the {@link TaskContext} required to instantiate operators
+ */
+ public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) {
+ inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context)));
+ }
+
+ /**
+ * Method to get the corresponding {@link RootOperatorImpl}
+ *
+ * @param ss input {@link SystemStream}
+ * @param <M> the type of input message
+ * @return the {@link OperatorImpl} that starts processing the input message
+ */
+ public <M> OperatorImpl<M, M> get(SystemStream ss) {
+ return this.operatorGraph.get(ss);
+ }
+
+ /**
+ * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
+ * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+ *
+ * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
+ * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl}
+ * @param config the {@link Config} required to instantiate operators
+ * @param context the {@link TaskContext} required to instantiate operators
+ * @return root node for the {@link OperatorImpl} DAG
+ */
+ private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config,
+ TaskContext context) {
+ // since the source message stream might have multiple operator specs registered on it,
+ // create a new root node as a single point of entry for the DAG.
+ RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+ // create the pipeline/topology starting from the source
+ source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+ // pass in the source and context s.t. stateful stream operators can initialize their stores
+ OperatorImpl<M, ?> operatorImpl =
+ this.createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+ rootOperator.registerNextOperator(operatorImpl);
+ });
+ return rootOperator;
+ }
+
+ /**
+ * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
+ * {@link OperatorImpl}s.
+ *
+ * @param operatorSpec the operatorSpec registered with the {@code source}
+ * @param source the source {@link MessageStreamImpl}
+ * @param <M> type of input message
+ * @param config the {@link Config} required to instantiate operators
+ * @param context the {@link TaskContext} required to instantiate operators
+ * @return the operator implementation for the operatorSpec
+ */
+ private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+ MessageStreamImpl<M> source, Config config, TaskContext context) {
+ if (!operators.containsKey(operatorSpec)) {
+ OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+ if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
+ // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
+ // so traverse and initialize and register the rest of the DAG.
+ // initialize the corresponding operator function
+ operatorSpec.init(config, context);
+ MessageStreamImpl nextStream = operatorSpec.getNextStream();
+ if (nextStream != null) {
+ Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
+ registeredSpecs.forEach(registeredSpec -> {
+ OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+ operatorImpl.registerNextOperator(subImpl);
+ });
+ }
+ return operatorImpl;
+ }
+ }
+
+ // the implementation corresponding to operatorSpec has already been instantiated
+ // and registered, so we do not need to traverse the DAG further.
+ return operators.get(operatorSpec);
+ }
+
+ /**
+ * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
+ *
+ * @param source the source {@link MessageStreamImpl}
+ * @param <M> type of input message
+ * @param operatorSpec the immutable {@link OperatorSpec} definition.
+ * @param config the {@link Config} required to instantiate operators
+ * @param context the {@link TaskContext} required to instantiate operators
+ * @return the {@link OperatorImpl} implementation instance
+ */
+ private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
+ if (operatorSpec instanceof StreamOperatorSpec) {
+ StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
+ return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+ } else if (operatorSpec instanceof SinkOperatorSpec) {
+ return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+ } else if (operatorSpec instanceof WindowOperatorSpec) {
+ return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context);
+ } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+ return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
+ }
+ throw new IllegalArgumentException(
+ String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index c77914e..abb1fa9 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,10 +18,7 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import java.util.HashSet;
@@ -31,32 +28,24 @@ import java.util.Set;
/**
* Abstract base class for all stream operator implementations.
*/
-public abstract class OperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> {
+public abstract class OperatorImpl<M, RM> {
- private final Set<OperatorImpl<RM, ? extends MessageEnvelope>> nextOperators = new HashSet<>();
+ private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
/**
* Register the next operator in the chain that this operator should propagate its output to.
* @param nextOperator the next operator in the chain.
*/
- void registerNextOperator(OperatorImpl<RM, ? extends MessageEnvelope> nextOperator) {
+ void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
nextOperators.add(nextOperator);
}
/**
- * Initialize the initial state for stateful operators.
- *
- * @param source the source that this {@link OperatorImpl} operator is registered with
- * @param context the task context to initialize the operator implementation
- */
- public void init(MessageStream<M> source, TaskContext context) {}
-
- /**
* Perform the transformation required for this operator and call the downstream operators.
*
* Must call {@link #propagateResult} to propage the output to registered downstream operators correctly.
*
- * @param message the input {@link MessageEnvelope}
+ * @param message the input message
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
@@ -67,11 +56,12 @@ public abstract class OperatorImpl<M extends MessageEnvelope, RM extends Message
*
* This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly.
*
- * @param outputMessage output {@link MessageEnvelope}
+ * @param outputMessage output message
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
deleted file mode 100644
index 02095cb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.TaskContext;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
- * {@link MessageStreamImpl}
- */
-public class OperatorImpls {
-
- /**
- * Holds the mapping between the {@link OperatorSpec} and {@link OperatorImpl}s instances.
- */
- private static final Map<OperatorSpec, OperatorImpl> OPERATOR_IMPLS = new ConcurrentHashMap<>();
-
- /**
- * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
- * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
- *
- * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
- * @param <M> the type of {@link MessageEnvelope}s in the {@code source} {@link MessageStream}
- * @param context the {@link TaskContext} required to instantiate operators
- * @return root node for the {@link OperatorImpl} DAG
- */
- public static <M extends MessageEnvelope> RootOperatorImpl createOperatorImpls(MessageStreamImpl<M> source, TaskContext context) {
- // since the source message stream might have multiple operator specs registered on it,
- // create a new root node as a single point of entry for the DAG.
- RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
- // create the pipeline/topology starting from the source
- source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
- // pass in the source and context s.t. stateful stream operators can initialize their stores
- OperatorImpl<M, ? extends MessageEnvelope> operatorImpl =
- createAndRegisterOperatorImpl(registeredOperator, source, context);
- rootOperator.registerNextOperator(operatorImpl);
- });
- return rootOperator;
- }
-
- /**
- * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
- * {@link OperatorImpl}s.
- *
- * @param operatorSpec the operatorSpec registered with the {@code source}
- * @param source the source {@link MessageStreamImpl}
- * @param context the context of the task
- * @return the operator implementation for the operatorSpec
- */
- private static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
- MessageStream source, TaskContext context) {
- if (!OPERATOR_IMPLS.containsKey(operatorSpec)) {
- OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = createOperatorImpl(operatorSpec);
- if (OPERATOR_IMPLS.putIfAbsent(operatorSpec, operatorImpl) == null) {
- // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
- // so traverse and initialize and register the rest of the DAG.
- MessageStream<? extends MessageEnvelope> outStream = operatorSpec.getOutputStream();
- Collection<OperatorSpec> registeredSpecs = ((MessageStreamImpl) outStream).getRegisteredOperatorSpecs();
- registeredSpecs.forEach(registeredSpec -> {
- OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, outStream, context);
- operatorImpl.registerNextOperator(subImpl);
- });
- operatorImpl.init(source, context);
- return operatorImpl;
- }
- }
-
- // the implementation corresponding to operatorSpec has already been instantiated
- // and registered, so we do not need to traverse the DAG further.
- return OPERATOR_IMPLS.get(operatorSpec);
- }
-
- /**
- * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
- *
- * @param operatorSpec the immutable {@link OperatorSpec} definition.
- * @param <M> type of input {@link MessageEnvelope}
- * @return the {@link OperatorImpl} implementation instance
- */
- protected static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createOperatorImpl(OperatorSpec operatorSpec) {
- if (operatorSpec instanceof StreamOperatorSpec) {
- return new StreamOperatorImpl<>((StreamOperatorSpec<M, ? extends MessageEnvelope>) operatorSpec);
- } else if (operatorSpec instanceof SinkOperatorSpec) {
- return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec);
- } else if (operatorSpec instanceof WindowOperatorSpec) {
- return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?, ?, ? extends WindowPane>) operatorSpec);
- } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
- return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec);
- }
- throw new IllegalArgumentException(
- String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index 90569b4..c8515e1 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,9 +18,11 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
@@ -28,14 +30,13 @@ import org.apache.samza.task.TaskCoordinator;
* Implementation of a {@link PartialJoinOperatorSpec}. This class implements function
* that only takes in one input stream among all inputs to the join and generate the join output.
*
- * @param <M> type of {@link MessageEnvelope}s in the input stream
- * @param <JM> type of {@link MessageEnvelope}s in the stream to join with
- * @param <RM> type of {@link MessageEnvelope}s in the joined stream
+ * @param <M> type of messages in the input stream
+ * @param <JM> type of messages in the stream to join with
+ * @param <RM> type of messages in the joined stream
*/
-class PartialJoinOperatorImpl<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
- extends OperatorImpl<M, RM> {
+class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> {
- PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp) {
+ PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) {
// TODO: implement PartialJoinOperatorImpl constructor
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index 7132b86..4b30a5d 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -18,16 +18,15 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
/**
- * A no-op operator implementation that forwards incoming {@link MessageEnvelope}s to all of its subscribers.
- * @param <M> type of incoming {@link MessageEnvelope}s
+ * A no-op operator implementation that forwards incoming messages to all of its subscribers.
+ * @param <M> type of incoming messages
*/
-final class RootOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, M> {
+final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
@Override
public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
new file mode 100644
index 0000000..2bb362c
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Default implementation class of a {@link WindowOperatorSpec} for a session window.
+ *
+ * @param <M> the type of input message
+ * @param <RK> the type of window key
+ * @param <WV> the type of window state
+ */
+class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> {
+
+ private final WindowOperatorSpec<M, RK, WV> windowSpec;
+
+ SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
+ this.windowSpec = windowSpec;
+ }
+
+ @Override
+ public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ }
+
+ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ // This is to periodically check the timeout triggers to get the list of window states to be updated
+ }
+}
[2/4] samza git commit: SAMZA-1073: top-level fluent API `
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index abed03f..41d1778 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,21 +18,22 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
* Implementation for {@link SinkOperatorSpec}
*/
-class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, MessageEnvelope> {
+class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
private final SinkFunction<M> sinkFn;
- SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) {
+ SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
this.sinkFn = sinkOp.getSinkFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 3a5c56e..644de20 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -18,24 +18,26 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming {@link MessageEnvelope}.
+ * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
*
- * @param <M> type of {@link MessageEnvelope} in the input stream
- * @param <RM> type of {@link MessageEnvelope} in the output stream
+ * @param <M> type of message in the input stream
+ * @param <RM> type of message in the output stream
*/
-class StreamOperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> extends OperatorImpl<M, RM> {
+class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
private final FlatMapFunction<M, RM> transformFn;
- StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) {
+ StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
this.transformFn = streamOperatorSpec.getTransformFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index a5b71a7..af00553 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -18,18 +18,21 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> {
+public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
- private final WindowInternal<M, K, WV> window;
+ private final WindowInternal<M, WK, WV> window;
- public WindowOperatorImpl(WindowOperatorSpec spec) {
+ public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
+ // source, config, and context are used to initialize the window kv-store
window = spec.getWindow();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 8b75cdc..1444662 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -18,20 +18,45 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.task.TaskContext;
/**
* A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStream} and produce the output {@link MessageStream}.
+ * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ *
+ * @param <OM> the type of output message from the operator
*/
-public interface OperatorSpec<OM extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface OperatorSpec<OM> {
+
+ enum OpCode {
+ MAP,
+ FLAT_MAP,
+ FILTER,
+ SINK,
+ SEND_TO,
+ JOIN,
+ WINDOW,
+ MERGE,
+ PARTITION_BY
+ }
+
/**
- * Get the output stream containing transformed {@link MessageEnvelope} produced by this operator.
- * @return the output stream containing transformed {@link MessageEnvelope} produced by this operator.
+ * Get the output stream containing transformed messages produced by this operator.
+ * @return the output stream containing transformed messages produced by this operator.
*/
- MessageStream<OM> getOutputStream();
+ MessageStreamImpl<OM> getNextStream();
+ /**
+ * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index fc25929..d626852 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,16 +19,21 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import java.util.ArrayList;
-import java.util.UUID;
-import java.util.function.BiFunction;
+import org.apache.samza.task.TaskContext;
/**
@@ -38,80 +43,168 @@ public class OperatorSpecs {
private OperatorSpecs() {}
- private static String getOperatorId() {
- // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
- return UUID.randomUUID().toString();
+ /**
+ * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
+ *
+ * @param mapFn the map function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @param <OM> type of output message
+ * @return the {@link StreamOperatorSpec}
+ */
+ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+ return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
+ @Override
+ public Collection<OM> apply(M message) {
+ return new ArrayList<OM>() {
+ {
+ OM r = mapFn.apply(message);
+ if (r != null) {
+ this.add(r);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ mapFn.init(config, context);
+ }
+ }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
+ *
+ * @param filterFn the transformation function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @return the {@link StreamOperatorSpec}
+ */
+ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+ return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
+ @Override
+ public Collection<M> apply(M message) {
+ return new ArrayList<M>() {
+ {
+ if (filterFn.apply(message)) {
+ this.add(message);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ filterFn.init(config, context);
+ }
+ }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
}
/**
* Creates a {@link StreamOperatorSpec}.
*
* @param transformFn the transformation function
- * @param <M> type of input {@link MessageEnvelope}
- * @param <OM> type of output {@link MessageEnvelope}
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
- public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
- FlatMapFunction<M, OM> transformFn) {
- return new StreamOperatorSpec<>(transformFn);
+ public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
+ FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+ return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link SinkOperatorSpec}.
+ *
+ * @param sinkFn the sink function
+ * @param <M> type of input message
+ * @param graph the {@link StreamGraphImpl} object
+ * @return the {@link SinkOperatorSpec}
+ */
+ public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link SinkOperatorSpec}.
+ *
+ * @param sinkFn the sink function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param stream the {@link OutputStream} where the message is sent to
+ * @param <M> type of input message
+ * @return the {@link SinkOperatorSpec}
+ */
+ public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
}
/**
* Creates a {@link SinkOperatorSpec}.
*
* @param sinkFn the sink function
- * @param <M> type of input {@link MessageEnvelope}
+ * @param graph the {@link StreamGraphImpl} object
+ * @param stream the {@link OutputStream} where the message is sent to
+ * @param <M> type of input message
* @return the {@link SinkOperatorSpec}
*/
- public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn) {
- return new SinkOperatorSpec<>(sinkFn);
+ public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
}
/**
* Creates a {@link WindowOperatorSpec}.
*
* @param window the description of the window.
- * @param <M> the type of input {@link MessageEnvelope}
- * @param <K> the type of key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. If a key is specified,
- * results are emitted per-key
+ * @param graph the {@link StreamGraphImpl} object
+ * @param wndOutput the window output {@link MessageStreamImpl} object
+ * @param <M> the type of input message
* @param <WK> the type of key in the {@link WindowPane}
* @param <WV> the type of value in the window
- * @param <WM> the type of output {@link WindowPane}
* @return the {@link WindowOperatorSpec}
*/
- public static <M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> createWindowOperatorSpec(WindowInternal<M, K, WV> window) {
- return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId());
+ public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
+ WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
+ return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
}
/**
* Creates a {@link PartialJoinOperatorSpec}.
*
* @param partialJoinFn the join function
+ * @param graph the {@link StreamGraphImpl} object
* @param joinOutput the output {@link MessageStreamImpl}
- * @param <M> type of input {@link MessageEnvelope}
+ * @param <M> type of input message
* @param <K> type of join key
- * @param <JM> the type of {@link MessageEnvelope} in the other join stream
- * @param <OM> the type of {@link MessageEnvelope} in the join output
+ * @param <JM> the type of message in the other join stream
+ * @param <OM> the type of message in the join output
* @return the {@link PartialJoinOperatorSpec}
*/
- public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
- BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) {
- return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId());
+ public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
+ PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) {
+ return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId());
}
/**
* Creates a {@link StreamOperatorSpec} with a merger function.
*
+ * @param graph the {@link StreamGraphImpl} object
* @param mergeOutput the output {@link MessageStreamImpl} from the merger
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
* @return the {@link StreamOperatorSpec} for the merge
*/
- public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) {
- return new StreamOperatorSpec<M, M>(t ->
- new ArrayList<M>() { {
- this.add(t);
- } },
- mergeOutput);
+ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+ return new StreamOperatorSpec<M, M>(message ->
+ new ArrayList<M>() {
+ {
+ this.add(message);
+ }
+ },
+ mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index e6d77f6..e057c2b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -18,63 +18,69 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-
-import java.util.function.BiFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.task.TaskContext;
/**
- * Spec for the partial join operator that takes {@link MessageEnvelope}s from one input stream, joins with buffered
- * {@link MessageEnvelope}s from another stream, and produces join results to an output {@link MessageStreamImpl}.
+ * Spec for the partial join operator that takes messages from one input stream, joins with buffered
+ * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
*
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
* @param <K> the type of join key
- * @param <JM> the type of {@link MessageEnvelope} in the other join stream
- * @param <RM> the type of {@link MessageEnvelope} in the join output stream
+ * @param <JM> the type of message in the other join stream
+ * @param <RM> the type of message in the join output stream
*/
-public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
- implements OperatorSpec<RM> {
+public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> {
private final MessageStreamImpl<RM> joinOutput;
/**
- * The transformation function of {@link PartialJoinOperatorSpec} that takes an input {@link MessageEnvelope} of
- * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s of type {@code JM} from another stream,
- * and generates a joined result {@link MessageEnvelope} of type {@code RM}.
+ * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of
+ * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream,
+ * and generates a joined result message of type {@code RM}.
*/
- private final BiFunction<M, JM, RM> transformFn;
+ private final PartialJoinFunction<K, M, JM, RM> transformFn;
/**
* The unique ID for this operator.
*/
- private final String operatorId;
+ private final int opId;
/**
* Default constructor for a {@link PartialJoinOperatorSpec}.
*
- * @param partialJoinFn partial join function that take type {@code M} of input {@link MessageEnvelope} and join
- * w/ type {@code JM} of buffered {@link MessageEnvelope} from another stream
+ * @param partialJoinFn partial join function that take type {@code M} of input message and join
+ * w/ type {@code JM} of buffered message from another stream
* @param joinOutput the output {@link MessageStreamImpl} of the join results
*/
- PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) {
+ PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) {
this.joinOutput = joinOutput;
this.transformFn = partialJoinFn;
- this.operatorId = operatorId;
+ this.opId = opId;
}
@Override
- public String toString() {
- return this.operatorId;
- }
-
- @Override
- public MessageStreamImpl<RM> getOutputStream() {
+ public MessageStreamImpl<RM> getNextStream() {
return this.joinOutput;
}
- public BiFunction<M, JM, RM> getTransformFn() {
+ public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
return this.transformFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return OpCode.JOIN;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ @Override public void init(Config config, TaskContext context) {
+ this.transformFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 4348bc0..ba30d67 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,18 +18,30 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.task.TaskContext;
/**
* The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
* system. This is a terminal operator and does allows further operator chaining.
*
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
*/
-public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec {
+public class SinkOperatorSpec<M> implements OperatorSpec {
+
+ /**
+ * {@link OpCode} for this {@link SinkOperatorSpec}
+ */
+ private final OperatorSpec.OpCode opCode;
+
+ /**
+ * The unique ID for this operator.
+ */
+ private final int opId;
/**
* The user-defined sink function
@@ -37,14 +49,40 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
private final SinkFunction<M> sinkFn;
/**
- * Default constructor for a {@link SinkOperatorSpec}.
+ * Potential output stream defined by the {@link SinkFunction}
+ */
+ private final OutputStream<M> outStream;
+
+ /**
+ * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+ *
+ * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
+ * the output {@link org.apache.samza.task.MessageCollector} and the
+ * {@link org.apache.samza.task.TaskCoordinator}.
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+ * or {@link OpCode#PARTITION_BY}
+ * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+ */
+ SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
+ this(sinkFn, opCode, opId, null);
+ }
+
+ /**
+ * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
*
- * @param sinkFn a user defined {@link SinkFunction} that will be called with the output {@link MessageEnvelope},
+ * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
* the output {@link org.apache.samza.task.MessageCollector} and the
* {@link org.apache.samza.task.TaskCoordinator}.
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+ * or {@link OpCode#PARTITION_BY}
+ * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+ * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec}
*/
- SinkOperatorSpec(SinkFunction<M> sinkFn) {
+ SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
this.sinkFn = sinkFn;
+ this.opCode = opCode;
+ this.opId = opId;
+ this.outStream = outStream;
}
/**
@@ -52,11 +90,27 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
* @return null
*/
@Override
- public MessageStreamImpl getOutputStream() {
+ public MessageStreamImpl<M> getNextStream() {
return null;
}
public SinkFunction<M> getSinkFn() {
return this.sinkFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return this.opCode;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ public OutputStream<M> getOutStream() {
+ return this.outStream;
+ }
+
+ @Override public void init(Config config, TaskContext context) {
+ this.sinkFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index ed18da4..d7813f7 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,50 +18,74 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.task.TaskContext;
/**
- * The spec for a linear stream operator that outputs 0 or more {@link MessageEnvelope}s for each input {@link MessageEnvelope}.
+ * The spec for a linear stream operator that outputs 0 or more messages for each input message.
*
- * @param <M> the type of input {@link MessageEnvelope}
- * @param <OM> the type of output {@link MessageEnvelope}
+ * @param <M> the type of input message
+ * @param <OM> the type of output message
*/
-public class StreamOperatorSpec<M extends MessageEnvelope, OM extends MessageEnvelope> implements OperatorSpec<OM> {
+public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
- private final MessageStreamImpl<OM> outputStream;
+ /**
+ * {@link OpCode} for this {@link StreamOperatorSpec}
+ */
+ private final OperatorSpec.OpCode opCode;
- private final FlatMapFunction<M, OM> transformFn;
+ /**
+ * The unique ID for this operator.
+ */
+ private final int opId;
/**
- * Default constructor for a {@link StreamOperatorSpec}.
- *
- * @param transformFn the transformation function that transforms each input {@link MessageEnvelope} into a collection
- * of output {@link MessageEnvelope}s
+ * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) {
- this(transformFn, new MessageStreamImpl<>());
- }
+ private final MessageStreamImpl<OM> outputStream;
+
+ /**
+ * Transformation function applied in this {@link StreamOperatorSpec}
+ */
+ private final FlatMapFunction<M, OM> transformFn;
/**
* Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
*
* @param transformFn the transformation function
* @param outputStream the output {@link MessageStreamImpl}
+ * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
+ * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> outputStream) {
+ StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
this.outputStream = outputStream;
this.transformFn = transformFn;
+ this.opCode = opCode;
+ this.opId = opId;
}
@Override
- public MessageStreamImpl<OM> getOutputStream() {
+ public MessageStreamImpl<OM> getNextStream() {
return this.outputStream;
}
public FlatMapFunction<M, OM> getTransformFn() {
return this.transformFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return this.opCode;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.transformFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index cdc02a8..46417ed 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,29 +19,42 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
-public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> implements OperatorSpec<WM> {
- private final WindowInternal window;
+/**
+ * Default window operator spec object
+ *
+ * @param <M> the type of input message to the window
+ * @param <WK> the type of key of the window
+ * @param <WV> the type of aggregated value in the window output {@link WindowPane}
+ */
+public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
+
+ private final WindowInternal<M, WK, WV> window;
- private final MessageStreamImpl<WM> outputStream;
+ private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
- private final String operatorId;
+ private final int opId;
- public WindowOperatorSpec(WindowInternal window, String operatorId) {
+ /**
+ * Constructor for {@link WindowOperatorSpec}.
+ *
+ * @param window the window function
+ * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+ * @param opId auto-generated unique ID of this operator
+ */
+ WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
+ this.outputStream = outputStream;
this.window = window;
- this.outputStream = new MessageStreamImpl<>();
- this.operatorId = operatorId;
+ this.opId = opId;
}
@Override
- public MessageStream<WM> getOutputStream() {
+ public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
return this.outputStream;
}
@@ -49,7 +62,11 @@ public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends
return window;
}
- public String getOperatorId() {
- return operatorId;
+ public OpCode getOpCode() {
+ return OpCode.WINDOW;
+ }
+
+ public int getOpId() {
+ return this.opId;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
index e9af043..53bca2e 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
@@ -30,20 +30,16 @@ import org.apache.samza.annotation.InterfaceStability;
@InterfaceStability.Unstable
public interface WindowState<WV> {
/**
- * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope}
- * in the window is received
+ * Method to get the system time when the first message in the window is received
*
- * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope}
- * received in the window
+ * @return nano-second of system time for the first message received in the window
*/
long getFirstMessageTimeNs();
/**
- * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope}
- * in the window is received
+ * Method to get the system time when the last message in the window is received
*
- * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope}
- * received in the window
+ * @return nano-second of system time for the last message received in the window
*/
long getLastMessageTimeNs();
@@ -62,9 +58,9 @@ public interface WindowState<WV> {
long getLatestEventTimeNs();
/**
- * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window
+ * Method to get the total number of messages received in the window
*
- * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window
+ * @return number of messages in the window
*/
long getNumberMessages();
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
new file mode 100644
index 0000000..60a4c60
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
+ */
+public class SingleJobExecutionEnvironment implements ExecutionEnvironment {
+
+ @Override public void run(StreamGraphFactory app, Config config) {
+ // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+ // TODO: actually instantiate the tasks and run the job, i.e.
+ // 1. create all input/output/intermediate topics
+ // 2. create the single job configuration
+ // 3. execute JobRunner to submit the single job for the whole graph
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
new file mode 100644
index 0000000..f60ff82
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
+ */
+public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+
+ @Override public void run(StreamGraphFactory app, Config config) {
+ // 1. get logic graph for optimization
+ // StreamGraph logicGraph = app.create(config);
+ // 2. potential optimization....
+ // 3. create new instance of StreamGraphFactory that would generate the optimized graph
+ // 4. create all input/output/intermediate topics
+ // 5. create the configuration for StreamProcessor
+ // 6. start the StreamProcessor w/ optimized instance of StreamGraphFactory
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..fa7ec5e
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Execution of the logic sub-DAG
+ *
+ *
+ * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamGraphImpl} using the
+ * {@link org.apache.samza.operators.MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link InputMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented factory {@link StreamGraphFactory} as input parameter of the constructor.
+ * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
+ * from the {@link StreamGraphFactory}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
+ * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
+ * <p>
+ * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
+ * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
+ * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+
+ /**
+ * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
+ */
+ private final OperatorGraph operatorGraph = new OperatorGraph();
+
+ private final StreamGraphFactory graphFactory;
+
+ private ContextManager taskManager;
+
+ public StreamOperatorTask(StreamGraphFactory graphFactory) {
+ this.graphFactory = graphFactory;
+ }
+
+ @Override
+ public final void init(Config config, TaskContext context) throws Exception {
+ // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
+ StreamGraphImpl streams = (StreamGraphImpl) this.graphFactory.create(config);
+ this.taskManager = streams.getContextManager();
+
+ Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
+ context.getSystemStreamPartitions().forEach(ssp -> {
+ if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+ inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
+ }
+ });
+ operatorGraph.init(inputBySystemStream, config, this.taskManager.initTaskContext(config, context));
+ }
+
+ @Override
+ public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+ this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
+ .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+ }
+
+ @Override
+ public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ // TODO: invoke timer based triggers
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.taskManager.finalizeTaskContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
new file mode 100644
index 0000000..a91ce09
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class BroadcastGraph implements StreamGraphFactory {
+
+ private final Set<SystemStreamPartition> inputs;
+
+ BroadcastGraph(Set<SystemStreamPartition> inputs) {
+ this.inputs = inputs;
+ }
+
+ class MessageType {
+ String field1;
+ String field2;
+ String field3;
+ String field4;
+ String parKey;
+ private long timestamp;
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public StreamGraph create(Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+
+ BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+ inputs.forEach(entry -> {
+ MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return entry.getSystemStream();
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).
+ map(this::getInputMessage);
+
+ inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ });
+ return graph;
+ }
+
+ JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+ return (JsonMessageEnvelope) m1.getMessage();
+ }
+
+ boolean myFilter1(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key1");
+ }
+
+ boolean myFilter2(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key2");
+ }
+
+ boolean myFilter3(JsonMessageEnvelope m1) {
+ return m1.getMessage().parKey.equals("key3");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
new file mode 100644
index 0000000..2313f63
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class JoinGraph implements StreamGraphFactory {
+ private final Set<SystemStreamPartition> inputs;
+
+ JoinGraph(Set<SystemStreamPartition> inputs) {
+ this.inputs = inputs;
+ }
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+ @Override
+ public StreamGraph create(Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+
+ for (SystemStreamPartition input : inputs) {
+ MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return input.getSystemStream();
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).map(this::getInputMessage);
+ if (joinOutput == null) {
+ joinOutput = newSource;
+ } else {
+ joinOutput = joinOutput.join(newSource,
+ (KeyValueJoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope>) this::myJoinResult);
+ }
+ }
+
+ joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return null;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+ return graph;
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..ad6336a
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamGraphFactory {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public StreamGraph create(Config config) {
+ StreamGraph graph = StreamGraph.fromConfig(config);
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ flatMap(new MyStatsCounter()).
+ sendTo(pageViewPerMemberCounters);
+
+ return graph;
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new KeyValueStoreExample(), config);
+ }
+
+ class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+ private final int timeoutMs = 10 * 60 * 1000;
+
+ KeyValueStore<String, StatsWindowState> statsStore;
+
+ class StatsWindowState {
+ int lastCount = 0;
+ long timeAtLastOutput = 0;
+ int newCount = 0;
+ }
+
+ @Override
+ public Collection<StatsOutput> apply(PageViewEvent message) {
+ List<StatsOutput> outputStats = new ArrayList<>();
+ long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
+ String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+ StatsWindowState curState = this.statsStore.get(wndKey);
+ curState.newCount++;
+ long curTimeMs = System.currentTimeMillis();
+ if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+ curState.timeAtLastOutput = curTimeMs;
+ curState.lastCount += curState.newCount;
+ curState.newCount = 0;
+ outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+ }
+ // update counter w/o generating output
+ this.statsStore.put(wndKey, curState);
+ return outputStats;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+ }
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+ private String memberId;
+ private long timestamp;
+ private Integer count;
+
+ StatsOutput(String key, long timestamp, Integer count) {
+ this.memberId = key;
+ this.timestamp = timestamp;
+ this.count = count;
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public StatsOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
new file mode 100644
index 0000000..577d06f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Example {@link StreamGraphFactory} code to test the API methods
+ */
+public class NoContextStreamExample implements StreamGraphFactory {
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input1");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input2");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "output");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
+
+ @Override
+ public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
+ JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+
+ @Override
+ public String getFirstKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+ }
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
+ * remoteEnv.run(new NoContextStreamExample(), config);
+ * }
+ *
+ */
+ @Override public StreamGraph create(Config config) {
+ StreamGraph graph = StreamGraph.fromConfig(config);
+ MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input1, null, null);
+ MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input2, null, null);
+ OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
+ new StringSerde("UTF-8"), new JsonSerde<>());
+
+ inputSource1.map(this::getInputMessage).
+ join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
+ sendTo(outStream);
+
+ return graph;
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new NoContextStreamExample(), config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..ad433b6
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamGraphFactory {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public StreamGraph create(Config config) {
+ StreamGraph graph = StreamGraph.fromConfig(config);
+
+ MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
+
+ return graph;
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new OrderShipmentJoinExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Orders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Shipment");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "FulfilledOrders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+ String orderId;
+ long orderTimeMs;
+
+ OrderRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public OrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+ String orderId;
+ long shipTimeMs;
+
+ ShipmentRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.shipTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public ShipmentRecord getMessage() {
+ return this;
+ }
+ }
+
+ class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+ String orderId;
+ long orderTimeMs;
+ long shipTimeMs;
+
+ FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = orderTimeMs;
+ this.shipTimeMs = shipTimeMs;
+ }
+
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public FulFilledOrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
+ return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
+ }
+
+ class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+
+ @Override
+ public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
+ }
+
+ @Override
+ public String getFirstKey(OrderRecord message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(ShipmentRecord message) {
+ return message.getKey();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..1502aa2
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.Properties;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamGraphFactory {
+
+ @Override public StreamGraph create(Config config) {
+ StreamGraph graph = StreamGraph.fromConfig(config);
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+ setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
+ setAccumulationMode(AccumulationMode.DISCARDING)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+ return graph;
+ }
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new PageViewCounterExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..f15e514
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * Example {@link StreamGraphFactory} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamGraphFactory {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * remoteEnv.run(new UserMainExample(), config);
+ * }
+ *
+ */
+ @Override public StreamGraph create(Config config) {
+ StreamGraph graph = StreamGraph.fromConfig(config);
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
+ msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+
+ return graph;
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new RepartitionExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
[4/4] samza git commit: SAMZA-1073: top-level fluent API `
Posted by ni...@apache.org.
SAMZA-1073: top-level fluent API
`
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373048aa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373048aa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373048aa
Branch: refs/heads/samza-fluent-api-v1
Commit: 373048aa0a68221af5f6b5589bbe161c972b11a9
Parents: 38b1dc3
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 9 01:56:10 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 9 01:57:05 2017 -0800
----------------------------------------------------------------------
.../apache/samza/operators/ContextManager.java | 47 ++++
.../apache/samza/operators/MessageStream.java | 90 ++++---
.../apache/samza/operators/OutputStream.java | 41 +++
.../org/apache/samza/operators/StreamGraph.java | 118 +++++++++
.../samza/operators/StreamGraphFactory.java | 38 +++
.../samza/operators/StreamOperatorTask.java | 51 ----
.../org/apache/samza/operators/StreamSpec.java | 46 ++++
.../data/IncomingSystemMessageEnvelope.java | 63 -----
.../operators/data/InputMessageEnvelope.java | 63 +++++
.../samza/operators/data/MessageEnvelope.java | 2 +-
.../operators/functions/FilterFunction.java | 23 +-
.../operators/functions/FlatMapFunction.java | 25 +-
.../samza/operators/functions/InitFunction.java | 38 +++
.../samza/operators/functions/JoinFunction.java | 48 +++-
.../functions/KeyValueJoinFunction.java | 44 ++++
.../samza/operators/functions/MapFunction.java | 25 +-
.../samza/operators/functions/SinkFunction.java | 23 +-
.../samza/operators/triggers/AnyTrigger.java | 3 +-
.../samza/operators/triggers/CountTrigger.java | 4 +-
.../operators/triggers/RepeatingTrigger.java | 4 +-
.../triggers/TimeSinceFirstMessageTrigger.java | 3 +-
.../triggers/TimeSinceLastMessageTrigger.java | 4 +-
.../samza/operators/triggers/TimeTrigger.java | 4 +-
.../samza/operators/triggers/Trigger.java | 7 +-
.../samza/operators/triggers/Triggers.java | 41 +--
.../apache/samza/operators/windows/Window.java | 20 +-
.../samza/operators/windows/WindowKey.java | 19 +-
.../samza/operators/windows/WindowPane.java | 9 +-
.../apache/samza/operators/windows/Windows.java | 136 +++++-----
.../windows/internal/WindowInternal.java | 14 +-
.../samza/system/ExecutionEnvironment.java | 73 ++++++
.../java/org/apache/samza/task/TaskContext.java | 10 +
.../data/TestIncomingSystemMessage.java | 2 +-
.../operators/windows/TestWindowOutput.java | 35 ---
.../samza/operators/windows/TestWindowPane.java | 33 +++
.../samza/operators/MessageStreamImpl.java | 151 +++++++----
.../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++
.../operators/StreamOperatorAdaptorTask.java | 105 --------
.../functions/PartialJoinFunction.java | 65 +++++
.../samza/operators/impl/OperatorGraph.java | 164 ++++++++++++
.../samza/operators/impl/OperatorImpl.java | 22 +-
.../samza/operators/impl/OperatorImpls.java | 124 ---------
.../operators/impl/PartialJoinOperatorImpl.java | 15 +-
.../samza/operators/impl/RootOperatorImpl.java | 7 +-
.../impl/SessionWindowOperatorImpl.java | 52 ++++
.../samza/operators/impl/SinkOperatorImpl.java | 7 +-
.../operators/impl/StreamOperatorImpl.java | 14 +-
.../operators/impl/WindowOperatorImpl.java | 11 +-
.../samza/operators/spec/OperatorSpec.java | 39 ++-
.../samza/operators/spec/OperatorSpecs.java | 161 +++++++++---
.../operators/spec/PartialJoinOperatorSpec.java | 58 +++--
.../samza/operators/spec/SinkOperatorSpec.java | 70 ++++-
.../operators/spec/StreamOperatorSpec.java | 58 +++--
.../operators/spec/WindowOperatorSpec.java | 41 ++-
.../samza/operators/spec/WindowState.java | 16 +-
.../system/SingleJobExecutionEnvironment.java | 37 +++
.../system/StandaloneExecutionEnvironment.java | 41 +++
.../apache/samza/task/StreamOperatorTask.java | 108 ++++++++
.../apache/samza/example/BroadcastGraph.java | 121 +++++++++
.../org/apache/samza/example/JoinGraph.java | 118 +++++++++
.../samza/example/KeyValueStoreExample.java | 184 +++++++++++++
.../samza/example/NoContextStreamExample.java | 156 +++++++++++
.../samza/example/OrderShipmentJoinExample.java | 190 ++++++++++++++
.../samza/example/PageViewCounterExample.java | 133 ++++++++++
.../samza/example/RepartitionExample.java | 145 +++++++++++
.../samza/example/TestFluentStreamTasks.java | 99 +++++++
.../org/apache/samza/example/WindowGraph.java | 87 +++++++
.../apache/samza/operators/BroadcastTask.java | 96 -------
.../org/apache/samza/operators/JoinTask.java | 77 ------
.../operators/TestFluentStreamAdaptorTask.java | 85 ------
.../samza/operators/TestFluentStreamTasks.java | 112 --------
.../samza/operators/TestMessageStreamImpl.java | 52 ++--
.../operators/TestMessageStreamImplUtil.java | 26 ++
.../org/apache/samza/operators/WindowTask.java | 63 -----
.../samza/operators/impl/TestOperatorImpls.java | 94 +++++--
.../operators/impl/TestSinkOperatorImpl.java | 11 +-
.../operators/impl/TestStreamOperatorImpl.java | 20 +-
.../samza/operators/spec/TestOperatorSpecs.java | 65 +++--
78 files changed, 3455 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
new file mode 100644
index 0000000..c3b1cf3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Interface class defining methods to initialize and finalize the context used by the transformation functions.
+ */
+@InterfaceStability.Unstable
+public interface ContextManager {
+ /**
+ * The initialization method to create shared context for the whole task in Samza. Default to NO-OP
+ *
+ * @param config the configuration object for the task
+ * @param context the {@link TaskContext} object
+ * @return User-defined task-wide context object
+ */
+ default TaskContext initTaskContext(Config config, TaskContext context) {
+ return context;
+ }
+
+ /**
+ * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
+ *
+ */
+ default void finalizeTaskContext() { }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 6a2f95b..87a9fd3 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
@@ -29,73 +28,93 @@ import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import java.util.Collection;
+import java.util.function.Function;
/**
- * Represents a stream of {@link MessageEnvelope}s.
+ * Represents a stream of messages.
* <p>
* A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
*
- * @param <M> type of {@link MessageEnvelope}s in this stream
+ * @param <M> type of messages in this stream
*/
@InterfaceStability.Unstable
-public interface MessageStream<M extends MessageEnvelope> {
+public interface MessageStream<M> {
/**
- * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+ * Applies the provided 1:1 {@link Function} to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
*
- * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope}
- * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * @param mapFn the function to transform a message to another message
+ * @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);
+ <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);
/**
- * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream}
- * to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * Applies the provided 1:n {@link Function} to transform a message in this {@link MessageStream}
+ * to n messages in the transformed {@link MessageStream}
*
- * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s
- * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
+ * @param flatMapFn the function to transform a message to zero or more messages
+ * @param <TM> the type of messages in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
+ <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
/**
- * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
+ * Applies the provided {@link Function} to messages in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
* <p>
- * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream}
+ * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
* should be retained in the transformed {@link MessageStream}.
*
- * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream}
+ * @param filterFn the predicate to filter messages from this {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
MessageStream<M> filter(FilterFunction<M> filterFn);
/**
- * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output
- * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}.
+ * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
*
- * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems
+ * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
+ *
+ * @param sinkFn the function to send messages in this stream to output
*/
void sink(SinkFunction<M> sinkFn);
/**
- * Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window}
+ * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
+ *
+ * NOTE: the {@code stream} has to be a {@link MessageStream}.
+ *
+ * @param stream the output {@link MessageStream}
+ */
+ void sendTo(OutputStream<M> stream);
+
+ /**
+ * Allows sending messages to an intermediate {@link MessageStream}.
+ *
+ * NOTE: the {@code stream} has to be a {@link MessageStream}.
+ *
+ * @param stream the intermediate {@link MessageStream} to send the message to
+ * @return the intermediate {@link MessageStream} to consume the messages sent
+ */
+ MessageStream<M> sendThrough(OutputStream<M> stream);
+
+ /**
+ * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
* (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
* {@link WindowPane}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
*
- * @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream}
- * @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified,
+ * @param window the window to group and process messages from this {@link MessageStream}
+ * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
* panes are emitted per-key.
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
- * @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
- <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window);
+ <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
/**
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
@@ -103,23 +122,32 @@ public interface MessageStream<M extends MessageEnvelope> {
* We currently only support 2-way joins.
*
* @param otherStream the other {@link MessageStream} to be joined with
- * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream}
+ * @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param <K> the type of join key
- * @param <OM> the type of {@link MessageEnvelope}s in the other stream
- * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn}
+ * @param <OM> the type of messages in the other stream
+ * @param <RM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
- <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream,
- JoinFunction<M, OM, RM> joinFn);
+ <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn);
/**
* Merge all {@code otherStreams} with this {@link MessageStream}.
* <p>
- * The merging streams must have the same {@link MessageEnvelope} type {@code M}.
+ * The merging streams must have the same messages of type {@code M}.
*
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
-
+
+ /**
+ * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
+ *
+ * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
+ *
+ * @param parKeyExtractor a {@link Function} that extract the partition key from a message in this {@link MessageStream}
+ * @param <K> the type of partition key
+ * @return a {@link MessageStream} object after the re-partition
+ */
+ <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
new file mode 100644
index 0000000..179f0e7
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.SinkFunction;
+
+
+/**
+ * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
+ *
+ * @param <M> The type of message to be send to this output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStream<M> {
+
+ /**
+ * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
+ * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
+ * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
+ *
+ * @return The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
+ */
+ SinkFunction<M> getSinkFunction();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
new file mode 100644
index 0000000..9e6644b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Map;
+
+
+/**
+ * Job-level programming interface to create an operator DAG and run in various different runtime environments.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraph {
+ /**
+ * Method to add an input {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the input {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the input {@link MessageStream}
+ * @param <K> the type of key in the input message
+ * @param <V> the type of message in the input message
+ * @param <M> the type of {@link MessageEnvelope} in the input {@link MessageStream}
+ * @return the input {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to add an output {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the output {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the output {@link MessageStream}
+ * @param <K> the type of key in the output message
+ * @param <V> the type of message in the output message
+ * @param <M> the type of {@link MessageEnvelope} in the output {@link MessageStream}
+ * @return the output {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to add an intermediate {@link MessageStream} from the system
+ *
+ * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
+ * @param keySerde the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
+ * @param msgSerde the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
+ * @param <K> the type of key in the intermediate message
+ * @param <V> the type of message in the intermediate message
+ * @param <M> the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
+ * @return the intermediate {@link MessageStream} object
+ */
+ <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
+
+ /**
+ * Method to get the input {@link MessageStream}s
+ *
+ * @return the input {@link MessageStream}
+ */
+ Map<StreamSpec, MessageStream> getInStreams();
+
+ /**
+ * Method to get the {@link OutputStream}s
+ *
+ * @return the map of all {@link OutputStream}s
+ */
+ Map<StreamSpec, OutputStream> getOutStreams();
+
+ /**
+ * Method to set the {@link ContextManager} for this {@link StreamGraph}
+ *
+ * @param manager the {@link ContextManager} object
+ * @return this {@link StreamGraph} object
+ */
+ StreamGraph withContextManager(ContextManager manager);
+
+ String GRAPH_CONFIG = "job.stream.graph.impl.class";
+ String DEFAULT_GRAPH_IMPL_CLASS = "org.apache.samza.operators.StreamGraphImpl";
+
+ /**
+ * Static method to instantiate the implementation class of {@link StreamGraph}.
+ *
+ * @param config the {@link Config} object for this job
+ * @return the {@link StreamGraph} object created
+ */
+ static StreamGraph fromConfig(Config config) {
+
+ try {
+ if (StreamGraph.class.isAssignableFrom(Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)))) {
+ return (StreamGraph) Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)).newInstance();
+ }
+ } catch (Exception e) {
+ throw new ConfigException(String.format("Problem in loading StreamGraphImpl class %s", config.get(GRAPH_CONFIG)), e);
+ }
+ throw new ConfigException(String.format(
+ "Class %s does not implement interface StreamGraphBuilder properly",
+ config.get(GRAPH_CONFIG)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
new file mode 100644
index 0000000..c292363
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraphFactory {
+ /**
+ * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
+ * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+ *
+ * @param config the {@link Config} of the application
+ * @return the {@link StreamGraph} object which contains user-defined processing logic of the application
+ */
+ StreamGraph create(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
deleted file mode 100644
index 16cf27a..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Map;
-
-
-/**
- * A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s.
- * Implementations can describe the transformation steps for each {@link MessageStream} in the
- * {@link #transform} method using {@link MessageStream} APIs.
- * <p>
- * Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask},
- * {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces,
- * but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask}
- * interfaces.
- */
-@InterfaceStability.Unstable
-public interface StreamOperatorTask {
-
- /**
- * Describe the transformation steps for each {@link MessageStream}s for this task using the
- * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition}
- * in the input system.
- *
- * @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s
- * from their corresponding {@link org.apache.samza.system.SystemStreamPartition}
- */
- void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
new file mode 100644
index 0000000..c8a5e8d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemStream;
+
+import java.util.Properties;
+
+
+/**
+ * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
+ * to create a {@link SystemStream}
+ */
+@InterfaceStability.Unstable
+public interface StreamSpec {
+ /**
+ * Get the {@link SystemStream}
+ *
+ * @return {@link SystemStream} object
+ */
+ SystemStream getSystemStream();
+
+ /**
+ * Get the physical properties of the {@link SystemStream}
+ *
+ * @return the properties of this stream
+ */
+ Properties getProperties();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
deleted file mode 100644
index a65809c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
- * and its {@link Offset} within the {@link SystemStreamPartition}.
- * <p>
- * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
- */
-public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, Object> {
-
- private final IncomingMessageEnvelope ime;
-
- /**
- * Creates an {@code IncomingSystemMessageEnvelope} from the {@link IncomingMessageEnvelope}.
- *
- * @param ime the {@link IncomingMessageEnvelope} from the input system.
- */
- public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) {
- this.ime = ime;
- }
-
- @Override
- public Object getKey() {
- return this.ime.getKey();
- }
-
- @Override
- public Object getMessage() {
- return this.ime.getMessage();
- }
-
- public Offset getOffset() {
- // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
- // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
- return new LongOffset(this.ime.getOffset());
- }
-
- public SystemStreamPartition getSystemStreamPartition() {
- return this.ime.getSystemStreamPartition();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
new file mode 100644
index 0000000..306145b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
+ * and its {@link Offset} within the {@link SystemStreamPartition}.
+ * <p>
+ * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
+ */
+public class InputMessageEnvelope implements MessageEnvelope<Object, Object> {
+
+ private final IncomingMessageEnvelope ime;
+
+ /**
+ * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}.
+ *
+ * @param ime the {@link IncomingMessageEnvelope} from the input system.
+ */
+ public InputMessageEnvelope(IncomingMessageEnvelope ime) {
+ this.ime = ime;
+ }
+
+ @Override
+ public Object getKey() {
+ return this.ime.getKey();
+ }
+
+ @Override
+ public Object getMessage() {
+ return this.ime.getMessage();
+ }
+
+ public Offset getOffset() {
+ // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+ // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
+ return new LongOffset(this.ime.getOffset());
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return this.ime.getSystemStreamPartition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
index ad64231..703a44c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
@@ -23,7 +23,7 @@ import org.apache.samza.annotation.InterfaceStability;
/**
- * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s.
+ * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s
*/
@InterfaceStability.Unstable
public interface MessageEnvelope<K, M> {
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index e611cd0..73c5c9d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -19,22 +19,29 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that specifies whether a {@link MessageEnvelope} should be retained for further processing or filtered out.
- * @param <M> type of the input {@link MessageEnvelope}
+ * A function that specifies whether a message should be retained for further processing or filtered out.
+ * @param <M> type of the input message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface FilterFunction<M extends MessageEnvelope> {
+public interface FilterFunction<M> extends InitFunction {
/**
- * Returns a boolean indicating whether this {@link MessageEnvelope} should be retained or filtered out.
- * @param message the {@link MessageEnvelope} to be checked
- * @return true if {@link MessageEnvelope} should be retained
+ * Returns a boolean indicating whether this message should be retained or filtered out.
+ * @param message the input message to be checked
+ * @return true if {@code message} should be retained
*/
boolean apply(M message);
+ /**
+ * Init method to initialize the context for this {@link FilterFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index dbc0bd9..f8458f2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -19,26 +19,33 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
import java.util.Collection;
/**
- * A function that transforms a {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s,
+ * A function that transforms an input message into a collection of 0 or more messages,
* possibly of a different type.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <OM> type of the transformed {@link MessageEnvelope}s
+ * @param <M> type of the input message
+ * @param <OM> type of the transformed messages
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface FlatMapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface FlatMapFunction<M, OM> extends InitFunction {
/**
- * Transforms the provided {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s.
- * @param message the {@link MessageEnvelope} to be transformed
- * @return a collection of 0 or more transformed {@link MessageEnvelope}s
+ * Transforms the provided message into a collection of 0 or more messages.
+ * @param message the input message to be transformed
+ * @return a collection of 0 or more transformed messages
*/
Collection<OM> apply(M message);
+ /**
+ * Init method to initialize the context for this {@link FlatMapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { };
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
new file mode 100644
index 0000000..eec56df
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * interface defined to initalize the context of message transformation functions
+ */
+@InterfaceStability.Unstable
+public interface InitFunction {
+ /**
+ * Interface method to initialize the context for a specific message transformation function.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ void init(Config config, TaskContext context);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index 8cb1fce..afc92ee 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -19,26 +19,50 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined {@link MessageEnvelope}.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <JM> type of the {@link MessageEnvelope} to join with
- * @param <RM> type of the joined {@link MessageEnvelope}
+ * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ * @param <K> type of the join key
+ * @param <M> type of the input message
+ * @param <JM> type of the message to join with
+ * @param <RM> type of the joined message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface JoinFunction<M extends MessageEnvelope, JM extends MessageEnvelope, RM extends MessageEnvelope> {
+public interface JoinFunction<K, M, JM, RM> extends InitFunction {
/**
- * Join the provided {@link MessageEnvelope}s and produces the joined {@link MessageEnvelope}.
- * @param message the input {@link MessageEnvelope}
- * @param otherMessage the {@link MessageEnvelope} to join with
- * @return the joined {@link MessageEnvelope}
+ * Join the provided input messages and produces the joined messages.
+ * @param message the input message
+ * @param otherMessage the message to join with
+ * @return the joined message
*/
RM apply(M message, JM otherMessage);
+ /**
+ * Method to get the join key in the messages from the first input stream
+ *
+ * @param message the input message from the first input stream
+ * @return the join key
+ */
+ K getFirstKey(M message);
+
+ /**
+ * Method to get the join key in the messages from the second input stream
+ *
+ * @param message the input message from the second input stream
+ * @return the join key
+ */
+ K getSecondKey(JM message);
+
+ /**
+ * Init method to initialize the context for this {@link JoinFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
new file mode 100644
index 0000000..b651b3d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * A specific {@link JoinFunction} that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
+ * a joined message.
+ *
+ * @param <K> type of the join key
+ * @param <M> type of the input {@link MessageEnvelope}
+ * @param <JM> type of the {@link MessageEnvelope} to join with
+ * @param <RM> type of the joined message
+ */
+@InterfaceStability.Unstable
+@FunctionalInterface
+public interface KeyValueJoinFunction<K, M extends MessageEnvelope<K, ?>, JM extends MessageEnvelope<K, ?>, RM> extends JoinFunction<K, M, JM, RM> {
+
+ default K getFirstKey(M message) {
+ return message.getKey();
+ }
+
+ default K getSecondKey(JM message) {
+ return message.getKey();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index 04919a7..a051914 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -19,23 +19,30 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
/**
- * A function that transforms a {@link MessageEnvelope} into another {@link MessageEnvelope}, possibly of a different type.
- * @param <M> type of the input {@link MessageEnvelope}
- * @param <OM> type of the transformed {@link MessageEnvelope}
+ * A function that transforms an input message into another message, possibly of a different type.
+ * @param <M> type of the input message
+ * @param <OM> type of the transformed message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface MapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> {
+public interface MapFunction<M, OM> extends InitFunction {
/**
- * Transforms the provided {@link MessageEnvelope} into another {@link MessageEnvelope}
- * @param message the {@link MessageEnvelope} to be transformed
- * @return the transformed {@link MessageEnvelope}
+ * Transforms the provided message into another message
+ * @param message the input message to be transformed
+ * @return the transformed message
*/
OM apply(M message);
+ /**
+ * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 505da92..1050771 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -19,28 +19,35 @@
package org.apache.samza.operators.functions;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
- * A function that allows sending a {@link MessageEnvelope} to an output system.
- * @param <M> type of the input {@link MessageEnvelope}
+ * A function that allows sending a message to an output system.
+ * @param <M> type of the input message
*/
@InterfaceStability.Unstable
-@FunctionalInterface
-public interface SinkFunction<M extends MessageEnvelope> {
+public interface SinkFunction<M> extends InitFunction {
/**
- * Allows sending the provided {@link MessageEnvelope} to an output {@link org.apache.samza.system.SystemStream} using
+ * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using
* the provided {@link MessageCollector}. Also provides access to the {@link TaskCoordinator} to request commits
* or shut the container down.
*
- * @param message the {@link MessageEnvelope} to be sent to an output {@link org.apache.samza.system.SystemStream}
- * @param messageCollector the {@link MessageCollector} to use to send the {@link MessageEnvelope}
+ * @param message the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
+ * @param messageCollector the {@link MessageCollector} to use to send the {@link org.apache.samza.operators.data.MessageEnvelope}
* @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown
*/
void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);
+ /**
+ * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index 3ca4e9a..6e134df 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -18,13 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.util.List;
/**
* A {@link Trigger} fires as soon as any of its individual triggers has fired.
*/
-public class AnyTrigger<M extends MessageEnvelope> implements Trigger {
+public class AnyTrigger<M> implements Trigger {
private final List<Trigger> triggers;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
index ba14928..1cf930c 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -18,13 +18,11 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
/**
* A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
* reaches the specified count.
*/
-public class CountTrigger<M extends MessageEnvelope> implements Trigger {
+public class CountTrigger<M> implements Trigger {
private final long count;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index ae9564d..7f78eb8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -18,12 +18,10 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
/**
* A {@link Trigger} that repeats its underlying trigger forever.
*/
-class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> {
+class RepeatingTrigger<M> implements Trigger<M> {
private final Trigger<M> trigger;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 13fc3cd..4de60a2 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.time.Duration;
@@ -27,7 +26,7 @@ import java.time.Duration;
* A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
* the window pane.
*/
-public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 0150d86..6b09625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -18,14 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
import java.time.Duration;
/*
* A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
*/
-public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeSinceLastMessageTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index ed7fef7..c5875aa 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -18,14 +18,12 @@
*/
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
-
import java.time.Duration;
/*
* A {@link Trigger} that fires after the specified duration in processing time.
*/
-public class TimeTrigger<M extends MessageEnvelope> implements Trigger {
+public class TimeTrigger<M> implements Trigger {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
index 6dc4f43..be0a877 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java
@@ -20,15 +20,16 @@
package org.apache.samza.operators.triggers;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
/**
* Marker interface for all triggers. The firing of a trigger indicates the completion of a window pane.
*
* <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
*
- * @param <M> the type of the incoming {@link MessageEnvelope}
+ * @param <M> the type of the incoming message
*/
-public interface Trigger<M extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface Trigger<M> {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
index f27cfd8..97fb7b7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java
@@ -19,7 +19,6 @@
package org.apache.samza.operators.triggers;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import java.time.Duration;
import java.util.ArrayList;
@@ -35,61 +34,63 @@ import java.util.List;
* <pre> {@code
* MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS))
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS))))))
- * .accumulateFiredPanes());
+ * .setAccumulationMode(AccumulationMode.ACCUMULATING));
* }</pre>
*
- * @param <M> the type of input {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream}
*/
@InterfaceStability.Unstable
-public final class Triggers<M extends MessageEnvelope> {
+public final class Triggers {
private Triggers() { }
/**
- * Creates a {@link Trigger} that fires when the number of {@link MessageEnvelope}s in the pane
+ * Creates a {@link Trigger} that fires when the number of messages in the pane
* reaches the specified count.
*
- * @param count the number of {@link MessageEnvelope}s to fire the trigger after
+ * @param count the number of messages to fire the trigger after
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger count(long count) {
- return new CountTrigger(count);
+ public static <M> Trigger<M> count(long count) {
+ return new CountTrigger<M>(count);
}
/**
- * Creates a trigger that fires after the specified duration has passed since the first {@link MessageEnvelope} in
+ * Creates a trigger that fires after the specified duration has passed since the first message in
* the pane.
*
* @param duration the duration since the first element
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger timeSinceFirstMessage(Duration duration) {
- return new TimeSinceFirstMessageTrigger(duration);
+ public static <M> Trigger<M> timeSinceFirstMessage(Duration duration) {
+ return new TimeSinceFirstMessageTrigger<M>(duration);
}
/**
- * Creates a trigger that fires when there is no new {@link MessageEnvelope} for the specified duration in the pane.
+ * Creates a trigger that fires when there is no new message for the specified duration in the pane.
*
* @param duration the duration since the last element
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static Trigger timeSinceLastMessage(Duration duration) {
- return new TimeSinceLastMessageTrigger(duration);
+ public static <M> Trigger<M> timeSinceLastMessage(Duration duration) {
+ return new TimeSinceLastMessageTrigger<M>(duration);
}
/**
* Creates a trigger that fires when any of the provided triggers fire.
*
- * @param <M> the type of input {@link MessageEnvelope} in the window
* @param triggers the individual triggers
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static <M extends MessageEnvelope> Trigger any(Trigger<M>... triggers) {
- List<Trigger> triggerList = new ArrayList<>();
+ public static <M> Trigger<M> any(Trigger<M>... triggers) {
+ List<Trigger<M>> triggerList = new ArrayList<>();
for (Trigger trigger : triggers) {
triggerList.add(trigger);
}
- return new AnyTrigger(Collections.unmodifiableList(triggerList));
+ return new AnyTrigger<M>(Collections.unmodifiableList(triggerList));
}
/**
@@ -98,11 +99,11 @@ public final class Triggers<M extends MessageEnvelope> {
* <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from
* its individual {@link RepeatingTrigger}s.
*
- * @param <M> the type of input {@link MessageEnvelope} in the window
* @param trigger the individual trigger to repeat
+ * @param <M> the type of input message in the window
* @return the created trigger
*/
- public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) {
+ public static <M> Trigger<M> repeat(Trigger<M> trigger) {
return new RepeatingTrigger<>(trigger);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 6aae940..8aa665a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -19,18 +19,17 @@
package org.apache.samza.operators.windows;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.triggers.Trigger;
/**
- * Groups incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite
+ * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite
* windows for processing.
*
* <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
* that determine when results from the {@link Window} are emitted.
*
- * <p> Each emitted result contains one or more {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
- * A pane can include all {@link MessageEnvelope}s collected for the window so far or only the new {@link MessageEnvelope}s
+ * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
+ * A pane can include all messagess collected for the window so far or only the new messages
* since the last emitted pane. (as determined by the {@link AccumulationMode})
*
* <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
@@ -66,13 +65,12 @@ import org.apache.samza.operators.triggers.Trigger;
* <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers}
* APIs to create triggers.
*
- * @param <M> the type of the input {@link MessageEnvelope}
- * @param <K> the type of the key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}.
+ * @param <M> the type of the input message
+ * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}.
* @param <WV> the type of the value in the {@link WindowPane}.
- * @param <WM> the type of the output.
*/
@InterfaceStability.Unstable
-public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<K, WV>> {
+public interface Window<M, K, WV> {
/**
* Set the early triggers for this {@link Window}.
@@ -81,7 +79,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param trigger the early trigger
* @return the {@link Window} function with the early trigger
*/
- Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger);
+ Window<M, K, WV> setEarlyTrigger(Trigger<M> trigger);
/**
* Set the late triggers for this {@link Window}.
@@ -90,7 +88,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param trigger the late trigger
* @return the {@link Window} function with the late trigger
*/
- Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger);
+ Window<M, K, WV> setLateTrigger(Trigger<M> trigger);
/**
* Specify how a {@link Window} should process its previously emitted {@link WindowPane}s.
@@ -106,6 +104,6 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<
* @param mode the accumulation mode
* @return the {@link Window} function with the specified {@link AccumulationMode}.
*/
- Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode);
+ Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 7edf3e1..14bd5ab 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -21,7 +21,7 @@ package org.apache.samza.operators.windows;
/**
* Key for a {@link WindowPane} emitted from a {@link Window}.
*
- * @param <K> the type of the key in the incoming {@link org.apache.samza.operators.data.MessageEnvelope}.
+ * @param <K> the type of the key in the incoming message.
* Windows that are not keyed have a {@link Void} key type.
*
*/
@@ -29,18 +29,27 @@ public class WindowKey<K> {
private final K key;
- private final String windowId;
+ private final String paneId;
public WindowKey(K key, String windowId) {
this.key = key;
- this.windowId = windowId;
+ this.paneId = windowId;
}
public K getKey() {
return key;
}
- public String getWindowId() {
- return windowId;
+ public String getPaneId() {
+ return paneId;
+ }
+
+ @Override
+ public String toString() {
+ String wndKey = "";
+ if (!(key instanceof Void)) {
+ wndKey = String.format("%s:", key.toString());
+ }
+ return String.format("%s%s", wndKey, paneId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373048aa/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
index 0388048..3b66bd1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -18,16 +18,13 @@
*/
package org.apache.samza.operators.windows;
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
/**
* Specifies the result emitted from a {@link Window}.
*
* @param <K> the type of key in the window pane
* @param <V> the type of value in the window pane.
*/
-public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> {
+public final class WindowPane<K, V> {
private final WindowKey<K> key;
@@ -41,11 +38,11 @@ public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V>
this.mode = mode;
}
- @Override public V getMessage() {
+ public V getMessage() {
return this.value;
}
- @Override public WindowKey<K> getKey() {
+ public WindowKey<K> getKey() {
return this.key;
}