You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:40:53 UTC
[10/14] samza git commit: SAMZA-1073: top-level fluent API
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index abed03f..41d1778 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,21 +18,22 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
* Implementation for {@link SinkOperatorSpec}
*/
-class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, MessageEnvelope> {
+class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
private final SinkFunction<M> sinkFn;
- SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) {
+ SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
this.sinkFn = sinkOp.getSinkFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 3a5c56e..644de20 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -18,24 +18,26 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming {@link MessageEnvelope}.
+ * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
*
- * @param <M> type of {@link MessageEnvelope} in the input stream
- * @param <RM> type of {@link MessageEnvelope} in the output stream
+ * @param <M> type of message in the input stream
+ * @param <RM> type of message in the output stream
*/
-class StreamOperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> extends OperatorImpl<M, RM> {
+class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
private final FlatMapFunction<M, RM> transformFn;
- StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) {
+ StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
this.transformFn = streamOperatorSpec.getTransformFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index a5b71a7..af00553 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -18,18 +18,21 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> {
+public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
- private final WindowInternal<M, K, WV> window;
+ private final WindowInternal<M, WK, WV> window;
- public WindowOperatorImpl(WindowOperatorSpec spec) {
+ public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
+ // source, config, and context are used to initialize the window kv-store
window = spec.getWindow();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 8b75cdc..1444662 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -18,20 +18,45 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.task.TaskContext;
/**
* A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStream} and produce the output {@link MessageStream}.
+ * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ *
+ * @param <OM> the type of output message from the operator
*/
-public interface OperatorSpec<OM extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface OperatorSpec<OM> {
+
+ enum OpCode {
+ MAP,
+ FLAT_MAP,
+ FILTER,
+ SINK,
+ SEND_TO,
+ JOIN,
+ WINDOW,
+ MERGE,
+ PARTITION_BY
+ }
+
/**
- * Get the output stream containing transformed {@link MessageEnvelope} produced by this operator.
- * @return the output stream containing transformed {@link MessageEnvelope} produced by this operator.
+ * Get the output stream containing transformed messages produced by this operator.
+ * @return the output stream containing transformed messages produced by this operator.
*/
- MessageStream<OM> getOutputStream();
+ MessageStreamImpl<OM> getNextStream();
+ /**
+ * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
+ *
+ * @param config the {@link Config} object for this task
+ * @param context the {@link TaskContext} object for this task
+ */
+ default void init(Config config, TaskContext context) { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index fc25929..d626852 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,16 +19,21 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import java.util.ArrayList;
-import java.util.UUID;
-import java.util.function.BiFunction;
+import org.apache.samza.task.TaskContext;
/**
@@ -38,80 +43,168 @@ public class OperatorSpecs {
private OperatorSpecs() {}
- private static String getOperatorId() {
- // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
- return UUID.randomUUID().toString();
+ /**
+ * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
+ *
+ * @param mapFn the map function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @param <OM> type of output message
+ * @return the {@link StreamOperatorSpec}
+ */
+ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+ return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
+ @Override
+ public Collection<OM> apply(M message) {
+ return new ArrayList<OM>() {
+ {
+ OM r = mapFn.apply(message);
+ if (r != null) {
+ this.add(r);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ mapFn.init(config, context);
+ }
+ }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
+ *
+ * @param filterFn the transformation function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @return the {@link StreamOperatorSpec}
+ */
+ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+ return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
+ @Override
+ public Collection<M> apply(M message) {
+ return new ArrayList<M>() {
+ {
+ if (filterFn.apply(message)) {
+ this.add(message);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ filterFn.init(config, context);
+ }
+ }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
}
/**
* Creates a {@link StreamOperatorSpec}.
*
* @param transformFn the transformation function
- * @param <M> type of input {@link MessageEnvelope}
- * @param <OM> type of output {@link MessageEnvelope}
+ * @param graph the {@link StreamGraphImpl} object
+ * @param output the output {@link MessageStreamImpl} object
+ * @param <M> type of input message
+ * @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
- public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
- FlatMapFunction<M, OM> transformFn) {
- return new StreamOperatorSpec<>(transformFn);
+ public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
+ FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+ return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link SinkOperatorSpec}.
+ *
+ * @param sinkFn the sink function
+ * @param <M> type of input message
+ * @param graph the {@link StreamGraphImpl} object
+ * @return the {@link SinkOperatorSpec}
+ */
+ public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+ }
+
+ /**
+ * Creates a {@link SinkOperatorSpec}.
+ *
+ * @param sinkFn the sink function
+ * @param graph the {@link StreamGraphImpl} object
+ * @param stream the {@link OutputStream} where the message is sent to
+ * @param <M> type of input message
+ * @return the {@link SinkOperatorSpec}
+ */
+ public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
}
/**
* Creates a {@link SinkOperatorSpec}.
*
* @param sinkFn the sink function
- * @param <M> type of input {@link MessageEnvelope}
+ * @param graph the {@link StreamGraphImpl} object
+ * @param stream the {@link OutputStream} where the message is sent to
+ * @param <M> type of input message
* @return the {@link SinkOperatorSpec}
*/
- public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn) {
- return new SinkOperatorSpec<>(sinkFn);
+ public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
}
/**
* Creates a {@link WindowOperatorSpec}.
*
* @param window the description of the window.
- * @param <M> the type of input {@link MessageEnvelope}
- * @param <K> the type of key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. If a key is specified,
- * results are emitted per-key
+ * @param graph the {@link StreamGraphImpl} object
+ * @param wndOutput the window output {@link MessageStreamImpl} object
+ * @param <M> the type of input message
* @param <WK> the type of key in the {@link WindowPane}
* @param <WV> the type of value in the window
- * @param <WM> the type of output {@link WindowPane}
* @return the {@link WindowOperatorSpec}
*/
- public static <M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> createWindowOperatorSpec(WindowInternal<M, K, WV> window) {
- return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId());
+ public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
+ WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
+ return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
}
/**
* Creates a {@link PartialJoinOperatorSpec}.
*
* @param partialJoinFn the join function
+ * @param graph the {@link StreamGraphImpl} object
* @param joinOutput the output {@link MessageStreamImpl}
- * @param <M> type of input {@link MessageEnvelope}
+ * @param <M> type of input message
* @param <K> type of join key
- * @param <JM> the type of {@link MessageEnvelope} in the other join stream
- * @param <OM> the type of {@link MessageEnvelope} in the join output
+ * @param <JM> the type of message in the other join stream
+ * @param <OM> the type of message in the join output
* @return the {@link PartialJoinOperatorSpec}
*/
- public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
- BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) {
- return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId());
+ public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
+ PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) {
+ return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId());
}
/**
* Creates a {@link StreamOperatorSpec} with a merger function.
*
+ * @param graph the {@link StreamGraphImpl} object
* @param mergeOutput the output {@link MessageStreamImpl} from the merger
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
* @return the {@link StreamOperatorSpec} for the merge
*/
- public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) {
- return new StreamOperatorSpec<M, M>(t ->
- new ArrayList<M>() { {
- this.add(t);
- } },
- mergeOutput);
+ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+ return new StreamOperatorSpec<M, M>(message ->
+ new ArrayList<M>() {
+ {
+ this.add(message);
+ }
+ },
+ mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index e6d77f6..e057c2b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -18,63 +18,69 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-
-import java.util.function.BiFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.task.TaskContext;
/**
- * Spec for the partial join operator that takes {@link MessageEnvelope}s from one input stream, joins with buffered
- * {@link MessageEnvelope}s from another stream, and produces join results to an output {@link MessageStreamImpl}.
+ * Spec for the partial join operator that takes messages from one input stream, joins with buffered
+ * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
*
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
* @param <K> the type of join key
- * @param <JM> the type of {@link MessageEnvelope} in the other join stream
- * @param <RM> the type of {@link MessageEnvelope} in the join output stream
+ * @param <JM> the type of message in the other join stream
+ * @param <RM> the type of message in the join output stream
*/
-public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
- implements OperatorSpec<RM> {
+public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> {
private final MessageStreamImpl<RM> joinOutput;
/**
- * The transformation function of {@link PartialJoinOperatorSpec} that takes an input {@link MessageEnvelope} of
- * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s of type {@code JM} from another stream,
- * and generates a joined result {@link MessageEnvelope} of type {@code RM}.
+ * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of
+ * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream,
+ * and generates a joined result message of type {@code RM}.
*/
- private final BiFunction<M, JM, RM> transformFn;
+ private final PartialJoinFunction<K, M, JM, RM> transformFn;
/**
* The unique ID for this operator.
*/
- private final String operatorId;
+ private final int opId;
/**
* Default constructor for a {@link PartialJoinOperatorSpec}.
*
- * @param partialJoinFn partial join function that take type {@code M} of input {@link MessageEnvelope} and join
- * w/ type {@code JM} of buffered {@link MessageEnvelope} from another stream
+ * @param partialJoinFn partial join function that take type {@code M} of input message and join
+ * w/ type {@code JM} of buffered message from another stream
* @param joinOutput the output {@link MessageStreamImpl} of the join results
*/
- PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) {
+ PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) {
this.joinOutput = joinOutput;
this.transformFn = partialJoinFn;
- this.operatorId = operatorId;
+ this.opId = opId;
}
@Override
- public String toString() {
- return this.operatorId;
- }
-
- @Override
- public MessageStreamImpl<RM> getOutputStream() {
+ public MessageStreamImpl<RM> getNextStream() {
return this.joinOutput;
}
- public BiFunction<M, JM, RM> getTransformFn() {
+ public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
return this.transformFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return OpCode.JOIN;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ @Override public void init(Config config, TaskContext context) {
+ this.transformFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 4348bc0..ba30d67 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,18 +18,30 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.task.TaskContext;
/**
* The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
* system. This is a terminal operator and does allows further operator chaining.
*
- * @param <M> the type of input {@link MessageEnvelope}
+ * @param <M> the type of input message
*/
-public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec {
+public class SinkOperatorSpec<M> implements OperatorSpec {
+
+ /**
+ * {@link OpCode} for this {@link SinkOperatorSpec}
+ */
+ private final OperatorSpec.OpCode opCode;
+
+ /**
+ * The unique ID for this operator.
+ */
+ private final int opId;
/**
* The user-defined sink function
@@ -37,14 +49,40 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
private final SinkFunction<M> sinkFn;
/**
- * Default constructor for a {@link SinkOperatorSpec}.
+ * Potential output stream defined by the {@link SinkFunction}
+ */
+ private final OutputStream<M> outStream;
+
+ /**
+ * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+ *
+ * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
+ * the output {@link org.apache.samza.task.MessageCollector} and the
+ * {@link org.apache.samza.task.TaskCoordinator}.
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+ * or {@link OpCode#PARTITION_BY}
+ * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+ */
+ SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
+ this(sinkFn, opCode, opId, null);
+ }
+
+ /**
+ * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
*
- * @param sinkFn a user defined {@link SinkFunction} that will be called with the output {@link MessageEnvelope},
+ * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
* the output {@link org.apache.samza.task.MessageCollector} and the
* {@link org.apache.samza.task.TaskCoordinator}.
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+ * or {@link OpCode#PARTITION_BY}
+ * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+ * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec}
*/
- SinkOperatorSpec(SinkFunction<M> sinkFn) {
+ SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
this.sinkFn = sinkFn;
+ this.opCode = opCode;
+ this.opId = opId;
+ this.outStream = outStream;
}
/**
@@ -52,11 +90,27 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
* @return null
*/
@Override
- public MessageStreamImpl getOutputStream() {
+ public MessageStreamImpl<M> getNextStream() {
return null;
}
public SinkFunction<M> getSinkFn() {
return this.sinkFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return this.opCode;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ public OutputStream<M> getOutStream() {
+ return this.outStream;
+ }
+
+ @Override public void init(Config config, TaskContext context) {
+ this.sinkFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index ed18da4..d7813f7 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,50 +18,74 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.task.TaskContext;
/**
- * The spec for a linear stream operator that outputs 0 or more {@link MessageEnvelope}s for each input {@link MessageEnvelope}.
+ * The spec for a linear stream operator that outputs 0 or more messages for each input message.
*
- * @param <M> the type of input {@link MessageEnvelope}
- * @param <OM> the type of output {@link MessageEnvelope}
+ * @param <M> the type of input message
+ * @param <OM> the type of output message
*/
-public class StreamOperatorSpec<M extends MessageEnvelope, OM extends MessageEnvelope> implements OperatorSpec<OM> {
+public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
- private final MessageStreamImpl<OM> outputStream;
+ /**
+ * {@link OpCode} for this {@link StreamOperatorSpec}
+ */
+ private final OperatorSpec.OpCode opCode;
- private final FlatMapFunction<M, OM> transformFn;
+ /**
+ * The unique ID for this operator.
+ */
+ private final int opId;
/**
- * Default constructor for a {@link StreamOperatorSpec}.
- *
- * @param transformFn the transformation function that transforms each input {@link MessageEnvelope} into a collection
- * of output {@link MessageEnvelope}s
+ * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) {
- this(transformFn, new MessageStreamImpl<>());
- }
+ private final MessageStreamImpl<OM> outputStream;
+
+ /**
+ * Transformation function applied in this {@link StreamOperatorSpec}
+ */
+ private final FlatMapFunction<M, OM> transformFn;
/**
* Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
*
* @param transformFn the transformation function
* @param outputStream the output {@link MessageStreamImpl}
+ * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
+ * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> outputStream) {
+ StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
this.outputStream = outputStream;
this.transformFn = transformFn;
+ this.opCode = opCode;
+ this.opId = opId;
}
@Override
- public MessageStreamImpl<OM> getOutputStream() {
+ public MessageStreamImpl<OM> getNextStream() {
return this.outputStream;
}
public FlatMapFunction<M, OM> getTransformFn() {
return this.transformFn;
}
+
+ public OperatorSpec.OpCode getOpCode() {
+ return this.opCode;
+ }
+
+ public int getOpId() {
+ return this.opId;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.transformFn.init(config, context);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index cdc02a8..46417ed 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,29 +19,42 @@
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
-public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> implements OperatorSpec<WM> {
- private final WindowInternal window;
+/**
+ * Default window operator spec object
+ *
+ * @param <M> the type of input message to the window
+ * @param <WK> the type of key of the window
+ * @param <WV> the type of aggregated value in the window output {@link WindowPane}
+ */
+public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
+
+ private final WindowInternal<M, WK, WV> window;
- private final MessageStreamImpl<WM> outputStream;
+ private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
- private final String operatorId;
+ private final int opId;
- public WindowOperatorSpec(WindowInternal window, String operatorId) {
+ /**
+ * Constructor for {@link WindowOperatorSpec}.
+ *
+ * @param window the window function
+ * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+ * @param opId auto-generated unique ID of this operator
+ */
+ WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
+ this.outputStream = outputStream;
this.window = window;
- this.outputStream = new MessageStreamImpl<>();
- this.operatorId = operatorId;
+ this.opId = opId;
}
@Override
- public MessageStream<WM> getOutputStream() {
+ public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
return this.outputStream;
}
@@ -49,7 +62,11 @@ public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends
return window;
}
- public String getOperatorId() {
- return operatorId;
+ public OpCode getOpCode() {
+ return OpCode.WINDOW;
+ }
+
+ public int getOpId() {
+ return this.opId;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
index e9af043..53bca2e 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
@@ -30,20 +30,16 @@ import org.apache.samza.annotation.InterfaceStability;
@InterfaceStability.Unstable
public interface WindowState<WV> {
/**
- * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope}
- * in the window is received
+ * Method to get the system time when the first message in the window is received
*
- * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope}
- * received in the window
+ * @return nano-second of system time for the first message received in the window
*/
long getFirstMessageTimeNs();
/**
- * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope}
- * in the window is received
+ * Method to get the system time when the last message in the window is received
*
- * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope}
- * received in the window
+ * @return nano-second of system time for the last message received in the window
*/
long getLastMessageTimeNs();
@@ -62,9 +58,9 @@ public interface WindowState<WV> {
long getLatestEventTimeNs();
/**
- * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window
+ * Method to get the total number of messages received in the window
*
- * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window
+ * @return number of messages in the window
*/
long getNumberMessages();
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
new file mode 100644
index 0000000..fafa2cb
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
+ */
+public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+
+ @Override public void run(StreamGraphBuilder app, Config config) {
+ // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+ // TODO: actually instantiate the tasks and run the job, i.e.
+ // 1. create all input/output/intermediate topics
+ // 2. create the single job configuration
+ // 3. execute JobRunner to submit the single job for the whole graph
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
new file mode 100644
index 0000000..f0f6ef2
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
+ */
+public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+
+ // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+ StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+ app.init(graph, config);
+ return graph;
+ }
+
+ @Override public void run(StreamGraphBuilder app, Config config) {
+ // 1. get logic graph for optimization
+ // StreamGraph logicGraph = this.createGraph(app, config);
+ // 2. potential optimization....
+ // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
+ // 4. create all input/output/intermediate topics
+ // 5. create the configuration for StreamProcessor
+ // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..b007e3c
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Execution of the logic sub-DAG
+ *
+ *
+ * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamGraphImpl} using the
+ * {@link org.apache.samza.operators.MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link InputMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
+ * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
+ * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
+ * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
+ * <p>
+ * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
+ * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
+ * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+
+ /**
+ * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
+ */
+ private final OperatorGraph operatorGraph = new OperatorGraph();
+
+ private final StreamGraphBuilder graphBuilder;
+
+ private ContextManager contextManager;
+
+ public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
+ this.graphBuilder = graphBuilder;
+ }
+
+ @Override
+ public final void init(Config config, TaskContext context) throws Exception {
+ // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
+ StreamGraphImpl streams = new StreamGraphImpl();
+ this.graphBuilder.init(streams, config);
+ // get the context manager of the {@link StreamGraph} and initialize the task-specific context
+ this.contextManager = streams.getContextManager();
+
+ Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
+ context.getSystemStreamPartitions().forEach(ssp -> {
+ if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+ // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
+ inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
+ }
+ });
+ operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
+ }
+
+ @Override
+ public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+ this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
+ .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+ }
+
+ @Override
+ public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ // TODO: invoke timer based triggers
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.contextManager.finalizeTaskContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..85ebc6c
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ flatMap(new MyStatsCounter()).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new KeyValueStoreExample(), config);
+ }
+
+ class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+ private final int timeoutMs = 10 * 60 * 1000;
+
+ KeyValueStore<String, StatsWindowState> statsStore;
+
+ class StatsWindowState {
+ int lastCount = 0;
+ long timeAtLastOutput = 0;
+ int newCount = 0;
+ }
+
+ @Override
+ public Collection<StatsOutput> apply(PageViewEvent message) {
+ List<StatsOutput> outputStats = new ArrayList<>();
+ long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
+ String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+ StatsWindowState curState = this.statsStore.get(wndKey);
+ curState.newCount++;
+ long curTimeMs = System.currentTimeMillis();
+ if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+ curState.timeAtLastOutput = curTimeMs;
+ curState.lastCount += curState.newCount;
+ curState.newCount = 0;
+ outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+ }
+ // update counter w/o generating output
+ this.statsStore.put(wndKey, curState);
+ return outputStats;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+ }
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+ private String memberId;
+ private long timestamp;
+ private Integer count;
+
+ StatsOutput(String key, long timestamp, Integer count) {
+ this.memberId = key;
+ this.timestamp = timestamp;
+ this.count = count;
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public StatsOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
new file mode 100644
index 0000000..c6d2e6e
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods
+ */
+public class NoContextStreamExample implements StreamGraphBuilder {
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input1");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "input2");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "output");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class MessageType {
+ String joinKey;
+ List<String> joinFields = new ArrayList<>();
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+ return new JsonMessageEnvelope(
+ ((MessageType) ism.getMessage()).joinKey,
+ (MessageType) ism.getMessage(),
+ ism.getOffset(),
+ ism.getSystemStreamPartition());
+ }
+
+ class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
+
+ @Override
+ public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
+ JsonMessageEnvelope m2) {
+ MessageType newJoinMsg = new MessageType();
+ newJoinMsg.joinKey = m1.getKey();
+ newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+ newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+ return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+ }
+
+ @Override
+ public String getFirstKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(JsonMessageEnvelope message) {
+ return message.getKey();
+ }
+ }
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
+ * remoteEnv.run(new NoContextStreamExample(), config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+ MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input1, null, null);
+ MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+ input2, null, null);
+ OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
+ new StringSerde("UTF-8"), new JsonSerde<>());
+
+ inputSource1.map(this::getInputMessage).
+ join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
+ sendTo(outStream);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new NoContextStreamExample(), config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..0477066
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * UserMainExample runnableApp = new UserMainExample();
+ * runnableApp.run(remoteEnv, config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new OrderShipmentJoinExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Orders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec input2 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "Shipment");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "FulfilledOrders");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+ String orderId;
+ long orderTimeMs;
+
+ OrderRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public OrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+ String orderId;
+ long shipTimeMs;
+
+ ShipmentRecord(String orderId, long timeMs) {
+ this.orderId = orderId;
+ this.shipTimeMs = timeMs;
+ }
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public ShipmentRecord getMessage() {
+ return this;
+ }
+ }
+
+ class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+ String orderId;
+ long orderTimeMs;
+ long shipTimeMs;
+
+ FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+ this.orderId = orderId;
+ this.orderTimeMs = orderTimeMs;
+ this.shipTimeMs = shipTimeMs;
+ }
+
+
+ @Override
+ public String getKey() {
+ return this.orderId;
+ }
+
+ @Override
+ public FulFilledOrderRecord getMessage() {
+ return this;
+ }
+ }
+
+ FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
+ return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
+ }
+
+ class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+
+ @Override
+ public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
+ }
+
+ @Override
+ public String getFirstKey(OrderRecord message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(ShipmentRecord message) {
+ return message.getKey();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..f7d8bda
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.Properties;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamGraphBuilder {
+
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+ setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
+ setAccumulationMode(AccumulationMode.DISCARDING)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new PageViewCounterExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..6994ac4
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamGraphBuilder {
+
+ /**
+ * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in standalone:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+ * remoteEnv.run(new UserMainExample(), config);
+ * }
+ *
+ */
+ @Override public void init(StreamGraph graph, Config config) {
+
+ MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+ OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+ pageViewEvents.
+ partitionBy(m -> m.getMessage().memberId).
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
+ msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+ map(MyStreamOutput::new).
+ sendTo(pageViewPerMemberCounters);
+
+ }
+
+ // standalone local program model
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+ standaloneEnv.run(new RepartitionExample(), config);
+ }
+
+ StreamSpec input1 = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewEvent");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ StreamSpec output = new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return new SystemStream("kafka", "PageViewPerMember5min");
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ };
+
+ class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ String pageId;
+ String memberId;
+ long timestamp;
+
+ PageViewEvent(String pageId, String memberId, long timestamp) {
+ this.pageId = pageId;
+ this.memberId = memberId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getKey() {
+ return this.pageId;
+ }
+
+ @Override
+ public PageViewEvent getMessage() {
+ return this;
+ }
+ }
+
+ class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ String memberId;
+ long timestamp;
+ int count;
+
+ MyStreamOutput(WindowPane<String, Integer> m) {
+ this.memberId = m.getKey().getKey();
+ this.timestamp = Long.valueOf(m.getKey().getPaneId());
+ this.count = m.getMessage();
+ }
+
+ @Override
+ public String getKey() {
+ return this.memberId;
+ }
+
+ @Override
+ public MyStreamOutput getMessage() {
+ return this;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
new file mode 100644
index 0000000..8ecd44f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestBasicStreamGraphs {
+
+ private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+ for (int i = 0; i < 4; i++) {
+ this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+ }
+ } };
+
+ @Test
+ public void testUserTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testSplitTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+ @Test
+ public void testJoinTask() throws Exception {
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+ TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
+ StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+ Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+ pipelineMapFld.setAccessible(true);
+ OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+ adaptorTask.init(mockConfig, mockContext);
+ this.inputPartitions.forEach(partition -> {
+ assertNotNull(opGraph.get(partition.getSystemStream()));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
new file mode 100644
index 0000000..d22324b
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class TestBroadcastExample extends TestExampleBase {
+
+ TestBroadcastExample(Set<SystemStreamPartition> inputs) {
+ super(inputs);
+ }
+
+ class MessageType {
+ String field1;
+ String field2;
+ String field3;
+ String field4;
+ String parKey;
+ private long timestamp;
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+ inputs.keySet().forEach(entry -> {
+ MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+ @Override public SystemStream getSystemStream() {
+ return entry;
+ }
+
+ @Override public Properties getProperties() {
+ return null;
+ }
+ }, null, null).map(this::getInputMessage);
+
+ inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+ });
+ }
+
+ JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+ return (JsonMessageEnvelope) m1.getMessage();
+ }
+
+ boolean myFilter1(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key1");
+ }
+
+ boolean myFilter2(JsonMessageEnvelope m1) {
+ // Do user defined processing here
+ return m1.getMessage().parKey.equals("key2");
+ }
+
+ boolean myFilter3(JsonMessageEnvelope m1) {
+ return m1.getMessage().parKey.equals("key3");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
new file mode 100644
index 0000000..c4df9d4
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for test examples
+ *
+ */
+public abstract class TestExampleBase implements StreamGraphBuilder {
+
+ protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
+
+ TestExampleBase(Set<SystemStreamPartition> inputs) {
+ this.inputs = new HashMap<>();
+ for (SystemStreamPartition input : inputs) {
+ this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
+ this.inputs.get(input.getSystemStream()).add(input);
+ }
+ }
+
+}