You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 23:04:27 UTC
[4/5] samza git commit: SAMZA-1073: moving all operator classes into
samza-core
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
new file mode 100644
index 0000000..f0f6ef2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
+ */
+public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+
+ // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+ StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+ app.init(graph, config);
+ return graph;
+ }
+
+ @Override public void run(StreamGraphBuilder app, Config config) {
+ // 1. get logic graph for optimization
+ // StreamGraph logicGraph = this.createGraph(app, config);
+ // 2. potential optimization....
+ // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
+ // 4. create all input/output/intermediate topics
+ // 5. create the configuration for StreamProcessor
+ // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..b007e3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Execution of the logic sub-DAG
+ *
+ *
+ * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamGraphImpl} using the
+ * {@link org.apache.samza.operators.MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link InputMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
+ * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
+ * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
+ * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
+ * <p>
+ * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
+ * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
+ * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+
+ /**
+ * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
+ */
+ private final OperatorGraph operatorGraph = new OperatorGraph();
+
+ private final StreamGraphBuilder graphBuilder;
+
+ private ContextManager contextManager;
+
+ public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
+ this.graphBuilder = graphBuilder;
+ }
+
+ @Override
+ public final void init(Config config, TaskContext context) throws Exception {
+ // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
+ StreamGraphImpl streams = new StreamGraphImpl();
+ this.graphBuilder.init(streams, config);
+ // get the context manager of the {@link StreamGraph} and initialize the task-specific context
+ this.contextManager = streams.getContextManager();
+
+ Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
+ context.getSystemStreamPartitions().forEach(ssp -> {
+ if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+ // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
+ inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
+ }
+ });
+ operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
+ }
+
+ @Override
+ public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+ this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
+ .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+ }
+
+ @Override
+ public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ // TODO: invoke timer based triggers
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.contextManager.finalizeTaskContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..85ebc6c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ flatMap(new MyStatsCounter()).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new KeyValueStoreExample(), config);
+ }
+
+ class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+ private final int timeoutMs = 10 * 60 * 1000;
+
+ KeyValueStore<String, StatsWindowState> statsStore;
+
+ class StatsWindowState {
+ int lastCount = 0;
+ long timeAtLastOutput = 0;
+ int newCount = 0;
+ }
+
+ @Override
+ public Collection<StatsOutput> apply(PageViewEvent message) {
+ List<StatsOutput> outputStats = new ArrayList<>();
+ long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
+ String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+ StatsWindowState curState = this.statsStore.get(wndKey);
+ curState.newCount++;
+ long curTimeMs = System.currentTimeMillis();
+ if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+ curState.timeAtLastOutput = curTimeMs;
+ curState.lastCount += curState.newCount;
+ curState.newCount = 0;
+ outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+ }
+ // update counter w/o generating output
+ this.statsStore.put(wndKey, curState);
+ return outputStats;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+ }
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+ private String memberId;
+ private long timestamp;
+ private Integer count;
+
+ StatsOutput(String key, long timestamp, Integer count) {
+ this.memberId = key;
+ this.timestamp = timestamp;
+ this.count = count;
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public StatsOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
new file mode 100644
index 0000000..c6d2e6e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods
+ */
+public class NoContextStreamExample implements StreamGraphBuilder {
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input1");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input2");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "output");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
+
+ @Override
+ public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
+ JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+
+ @Override
+ public String getFirstKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+ }
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
+ * remoteEnv.run(new NoContextStreamExample(), config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+ MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input1, null, null);
+ MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input2, null, null);
+ OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
+ new StringSerde("UTF-8"), new JsonSerde<>());
+
+ inputSource1.map(this::getInputMessage).
+ join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
+ sendTo(outStream);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new NoContextStreamExample(), config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..0477066
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new OrderShipmentJoinExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Orders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Shipment");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "FulfilledOrders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+ String orderId;
+ long orderTimeMs;
+
+ OrderRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public OrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+ String orderId;
+ long shipTimeMs;
+
+ ShipmentRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.shipTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public ShipmentRecord getMessage() {
+ return this;
+ }
+ }
+
+ class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+ String orderId;
+ long orderTimeMs;
+ long shipTimeMs;
+
+ FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = orderTimeMs;
+ this.shipTimeMs = shipTimeMs;
+ }
+
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public FulFilledOrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
+ return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
+ }
+
+ class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+
+ @Override
+ public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
+ }
+
+ @Override
+ public String getFirstKey(OrderRecord message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(ShipmentRecord message) {
+ return message.getKey();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..f7d8bda
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.Properties;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamGraphBuilder {
+
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+ setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
+ setAccumulationMode(AccumulationMode.DISCARDING)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new PageViewCounterExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..6994ac4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * remoteEnv.run(new UserMainExample(), config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
+ msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new RepartitionExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
new file mode 100644
index 0000000..8ecd44f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestBasicStreamGraphs {
+
+ private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+ for (int i = 0; i < 4; i++) {
+ this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+ }
+ } };
+
+ @Test
+ public void testUserTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testSplitTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testJoinTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
new file mode 100644
index 0000000..d22324b
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class TestBroadcastExample extends TestExampleBase {
+
+ TestBroadcastExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class MessageType {
+ String field1;
+ String field2;
+ String field3;
+ String field4;
+ String parKey;
+ private long timestamp;
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+ inputs.keySet().forEach(entry -> {
+ MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return entry;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).map(this::getInputMessage);
+
+ inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ });
+ }
+
+ JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+ return (JsonMessageEnvelope) m1.getMessage();
+ }
+
+ boolean myFilter1(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key1");
+ }
+
+ boolean myFilter2(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key2");
+ }
+
+ boolean myFilter3(JsonMessageEnvelope m1) {
+ return m1.getMessage().parKey.equals("key3");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
new file mode 100644
index 0000000..c4df9d4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for test examples
+ *
+ */
+public abstract class TestExampleBase implements StreamGraphBuilder {
+
+ protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
+
+ TestExampleBase(Set<SystemStreamPartition> inputs) {
+ this.inputs = new HashMap<>();
+ for (SystemStreamPartition input : inputs) {
+ this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
+ this.inputs.get(input.getSystemStream()).add(input);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
new file mode 100644
index 0000000..fe6e7e7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class TestJoinExample extends TestExampleBase {
+
+ TestJoinExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ for (SystemStream input : inputs.keySet()) {
+ MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return input;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).map(this::getInputMessage);
+ if (joinOutput == null) {
+ joinOutput = newSource;
+ } else {
+ joinOutput = joinOutput.join(newSource, new MyJoinFunction());
+ }
+ }
+
+ joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return null;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
+ JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+
+ @Override
+ public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
+ return this.myJoinResult(message, otherMessage);
+ }
+
+ @Override
+ public String getFirstKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
new file mode 100644
index 0000000..e08ca20
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class TestWindowExample extends TestExampleBase {
+ class MessageType {
+ String field1;
+ String field2;
+ }
+
+ TestWindowExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+ inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return source;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).
+ map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
+ m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+
+ }
+
+ String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+ return m.getKey().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
new file mode 100644
index 0000000..160a47a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStreamImpl {
+
+ private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+
+ @Test
+ public void testMap() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
+ new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+ MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
+ assertTrue(mapOp instanceof StreamOperatorSpec);
+ assertEquals(mapOp.getNextStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
+ TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(xTestMsg.getKey()).thenReturn("test-msg-key");
+ when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getValue()).thenReturn("123456789");
+
+ Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
+ assertEquals(cOutputMsg.size(), 1);
+ TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
+ assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+ assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+ }
+
+ @Test
+ public void testFlatMap() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
+ this.add(mock(TestOutputMessageEnvelope.class));
+ this.add(mock(TestOutputMessageEnvelope.class));
+ this.add(mock(TestOutputMessageEnvelope.class));
+ } };
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
+ MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
+ assertTrue(flatMapOp instanceof StreamOperatorSpec);
+ assertEquals(flatMapOp.getNextStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
+ }
+
+ @Test
+ public void testFilter() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
+ MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
+ assertTrue(filterOp instanceof StreamOperatorSpec);
+ assertEquals(filterOp.getNextStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
+ Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
+ assertTrue(output.isEmpty());
+ when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
+ output = txfmFn.apply(mockMsg);
+ assertEquals(output.size(), 1);
+ assertEquals(output.iterator().next(), mockMsg);
+ }
+
+ @Test
+ public void testSink() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+ SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
+ mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+ tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ };
+ inputStream.sink(xSink);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
+ assertTrue(sinkOp instanceof SinkOperatorSpec);
+ assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
+ assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
+ }
+
+ @Test
+ public void testJoin() {
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
+ JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+ new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
+ return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ }
+
+ @Override
+ public String getFirstKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+ };
+
+ MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
+ Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
+ assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
+ assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
+ subs = source2.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
+ assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
+ assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
+ TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
+ TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
+ TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ }
+
+ @Test
+ public void testMerge() {
+ MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
+ Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
+ this.add(new MessageStreamImpl<>(mockGraph));
+ this.add(new MessageStreamImpl<>(mockGraph));
+ } };
+ MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
+ validateMergeOperator(merge1, mergeOutput);
+
+ others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ }
+
+ private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
+ Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
+ assertTrue(mergeOp instanceof StreamOperatorSpec);
+ assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.iterator().next(), mockMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
new file mode 100644
index 0000000..c4e9f51
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+public class TestMessageStreamImplUtil {
+ public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
+ return new MessageStreamImpl<M>(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
new file mode 100644
index 0000000..9a425d1
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input {@link MessageEnvelope} w/ Json message and string as the key.
+ */
+
+public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
+
+ private final String key;
+ private final T data;
+ private final Offset offset;
+ private final SystemStreamPartition partition;
+
+ public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
+ this.key = key;
+ this.data = data;
+ this.offset = offset;
+ this.partition = partition;
+ }
+
+ @Override
+ public T getMessage() {
+ return this.data;
+ }
+
+ @Override
+ public String getKey() {
+ return this.key;
+ }
+
+ public Offset getOffset() {
+ return this.offset;
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return this.partition;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
new file mode 100644
index 0000000..361972e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.core.IsEqual;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestOperatorImpl {
+
+ TestMessageEnvelope curInputMsg;
+ MessageCollector curCollector;
+ TaskCoordinator curCoordinator;
+
+ @Test
+ public void testSubscribers() {
+ this.curInputMsg = null;
+ this.curCollector = null;
+ this.curCoordinator = null;
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+ TestOperatorImpl.this.curInputMsg = message;
+ TestOperatorImpl.this.curCollector = collector;
+ TestOperatorImpl.this.curCoordinator = coordinator;
+ }
+ };
+ // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
+ OperatorImpl mockSub = mock(OperatorImpl.class);
+ opImpl.registerNextOperator(mockSub);
+ TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
+ verify(mockSub, times(1)).onNext(
+ argThat(new IsEqual<>(xOutput)),
+ argThat(new IsEqual<>(mockCollector)),
+ argThat(new IsEqual<>(mockCoordinator))
+ );
+ // verify onNext() is invoked correctly
+ TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+ opImpl.onNext(mockInput, mockCollector, mockCoordinator);
+ assertEquals(mockInput, this.curInputMsg);
+ assertEquals(mockCollector, this.curCollector);
+ assertEquals(mockCoordinator, this.curCoordinator);
+ }
+}