You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:31 UTC
[3/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159;
Remove MessageEnvelope from public operator APIs. : Delay the
creation of SinkFunction for output streams. : Move StreamSpec from a public
API to an internal class.
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index c00f470..e2c4b9a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,21 +19,20 @@
package org.apache.samza.operators.spec;
-import java.util.Collection;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
import java.util.ArrayList;
-import org.apache.samza.task.TaskContext;
+import java.util.Collection;
/**
@@ -47,13 +46,14 @@ public class OperatorSpecs {
* Creates a {@link StreamOperatorSpec} for {@link MapFunction}
*
* @param mapFn the map function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
* @param <M> type of input message
* @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
- public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
+ MapFunction<M, OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
@Override
public Collection<OM> apply(M message) {
@@ -71,19 +71,20 @@ public class OperatorSpecs {
public void init(Config config, TaskContext context) {
mapFn.init(config, context);
}
- }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+ }, nextStream, OperatorSpec.OpCode.MAP, opId);
}
/**
* Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
*
* @param filterFn the transformation function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
* @param <M> type of input message
* @return the {@link StreamOperatorSpec}
*/
- public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
+ FilterFunction<M> filterFn, MessageStreamImpl<M> nextStream, int opId) {
return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
@Override
public Collection<M> apply(M message) {
@@ -100,77 +101,81 @@ public class OperatorSpecs {
public void init(Config config, TaskContext context) {
filterFn.init(config, context);
}
- }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
+ }, nextStream, OperatorSpec.OpCode.FILTER, opId);
}
/**
* Creates a {@link StreamOperatorSpec}.
*
* @param transformFn the transformation function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
* @param <M> type of input message
* @param <OM> type of output message
* @return the {@link StreamOperatorSpec}
*/
public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
- FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
- return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+ FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) {
+ return new StreamOperatorSpec<>(transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId);
}
/**
- * Creates a {@link SinkOperatorSpec}.
+ * Creates a {@link SinkOperatorSpec} for the sink operator.
*
- * @param sinkFn the sink function
+ * @param sinkFn the sink function provided by the user
+ * @param opId the unique ID of the operator
* @param <M> type of input message
- * @param graph the {@link StreamGraphImpl} object
- * @return the {@link SinkOperatorSpec}
+ * @return the {@link SinkOperatorSpec} for the sink operator
*/
- public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+ public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+ return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, opId);
}
/**
- * Creates a {@link SinkOperatorSpec}.
+ * Creates a {@link SinkOperatorSpec} for the sendTo operator.
*
- * @param sinkFn the sink function
- * @param graph the {@link StreamGraphImpl} object
- * @param stream the {@link OutputStream} where the message is sent to
- * @param <M> type of input message
- * @return the {@link SinkOperatorSpec}
+ * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param opId the unique ID of the operator
+ * @param <K> the type of key in the outgoing message
+ * @param <V> the type of message in the outgoing message
+ * @param <M> the type of message in the {@link OutputStreamInternal}
+ * @return the {@link SinkOperatorSpec} for the sendTo operator
*/
- public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
+ public static <K, V, M> SinkOperatorSpec<M> createSendToOperatorSpec(
+ OutputStreamInternal<K, V, M> outputStream, int opId) {
+ return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
}
/**
- * Creates a {@link SinkOperatorSpec}.
+ * Creates a {@link SinkOperatorSpec} for the partitionBy operator.
*
- * @param sinkFn the sink function
- * @param stream the {@link OutputStream} where the message is sent to
- * @param opId operator ID
- * @param <M> type of input message
- * @return the {@link SinkOperatorSpec}
+ * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param opId the unique ID of the operator
+ * @param <K> the type of key in the outgoing message
+ * @param <V> the type of message in the outgoing message
+ * @param <M> the type of message in the {@link OutputStreamInternal}
+ * @return the {@link SinkOperatorSpec} for the partitionBy operator
*/
- public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, OutputStream<M> stream, int opId) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
+ public static <K, V, M> SinkOperatorSpec<M> createPartitionByOperatorSpec(
+ OutputStreamInternal<K, V, M> outputStream, int opId) {
+ return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
}
/**
* Creates a {@link WindowOperatorSpec}.
*
- * @param window the description of the window.
- * @param graph the {@link StreamGraphImpl} object
- * @param wndOutput the window output {@link MessageStreamImpl} object
- * @param <M> the type of input message
- * @param <WK> the type of key in the {@link WindowPane}
- * @param <WV> the type of value in the window
+ * @param window the description of the window.
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
+ * @param <M> the type of input message
+ * @param <WK> the type of key in the {@link WindowPane}
+ * @param <WV> the type of value in the window
* @return the {@link WindowOperatorSpec}
*/
public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
- WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
- return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
+ WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+ return new WindowOperatorSpec<>(window, nextStream, opId);
}
/**
@@ -179,8 +184,8 @@ public class OperatorSpecs {
* @param thisPartialJoinFn the partial join function for this message stream
* @param otherPartialJoinFn the partial join function for the other message stream
* @param ttlMs the ttl in ms for retaining messages in each stream
- * @param graph the {@link StreamGraphImpl} object
- * @param joinOutput the output {@link MessageStreamImpl}
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
* @param <K> the type of join key
* @param <M> the type of input message
* @param <JM> the type of message in the other join stream
@@ -189,25 +194,25 @@ public class OperatorSpecs {
*/
public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec(
PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
- long ttlMs, StreamGraphImpl graph, MessageStreamImpl<RM> joinOutput) {
- return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, joinOutput, graph.getNextOpId());
+ long ttlMs, MessageStreamImpl<RM> nextStream, int opId) {
+ return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId);
}
/**
* Creates a {@link StreamOperatorSpec} with a merger function.
*
- * @param graph the {@link StreamGraphImpl} object
- * @param mergeOutput the output {@link MessageStreamImpl} from the merger
+ * @param nextStream the output {@link MessageStreamImpl} to send messages to
+ * @param opId the unique ID of the operator
* @param <M> the type of input message
* @return the {@link StreamOperatorSpec} for the merge
*/
- public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) {
return new StreamOperatorSpec<M, M>(message ->
new ArrayList<M>() {
{
this.add(message);
}
},
- mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
+ nextStream, OperatorSpec.OpCode.MERGE, opId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index 669895f..b1dc529 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -35,11 +35,10 @@ import org.apache.samza.task.TaskContext;
*/
public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
-
private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
private final long ttlMs;
- private final MessageStreamImpl<RM> joinOutput;
+ private final MessageStreamImpl<RM> nextStream;
private final int opId;
/**
@@ -50,22 +49,22 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
* @param otherPartialJoinFn partial join function that provides state for input messages of type {@code JM}
* in the other stream
* @param ttlMs the ttl in ms for retaining messages in each stream
- * @param joinOutput the output {@link MessageStreamImpl} of the join results
+ * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
* @param opId the unique ID for this operator
*/
PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs,
- MessageStreamImpl<RM> joinOutput, int opId) {
+ MessageStreamImpl<RM> nextStream, int opId) {
this.thisPartialJoinFn = thisPartialJoinFn;
this.otherPartialJoinFn = otherPartialJoinFn;
this.ttlMs = ttlMs;
- this.joinOutput = joinOutput;
+ this.nextStream = nextStream;
this.opId = opId;
}
@Override
public MessageStreamImpl<RM> getNextStream() {
- return this.joinOutput;
+ return this.nextStream;
}
public PartialJoinFunction<K, M, JM, RM> getThisPartialJoinFn() {
@@ -80,10 +79,12 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
return ttlMs;
}
+ @Override
public OperatorSpec.OpCode getOpCode() {
return OpCode.JOIN;
}
+ @Override
public int getOpId() {
return this.opId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index ba30d67..7de85f3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -20,69 +20,54 @@ package org.apache.samza.operators.spec;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
/**
- * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
- * system. This is a terminal operator and does allows further operator chaining.
+ * The spec for an operator that outputs a {@link MessageStreamImpl} to an external system.
+ * This is a terminal operator and does not allow further operator chaining.
*
* @param <M> the type of input message
*/
public class SinkOperatorSpec<M> implements OperatorSpec {
- /**
- * {@link OpCode} for this {@link SinkOperatorSpec}
- */
+ private final SinkFunction<M> sinkFn;
+ private OutputStreamInternal<?, ?, M> outputStream; // may be null
private final OperatorSpec.OpCode opCode;
-
- /**
- * The unique ID for this operator.
- */
private final int opId;
/**
- * The user-defined sink function
- */
- private final SinkFunction<M> sinkFn;
-
- /**
- * Potential output stream defined by the {@link SinkFunction}
- */
- private final OutputStream<M> outStream;
-
- /**
- * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+ * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
*
* @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
* the output {@link org.apache.samza.task.MessageCollector} and the
* {@link org.apache.samza.task.TaskCoordinator}.
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
- * or {@link OpCode#PARTITION_BY}
- * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+ * It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}.
+ * @param opId the unique ID of this {@link OperatorSpec} in the graph
*/
SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
- this(sinkFn, opCode, opId, null);
+ this.sinkFn = sinkFn;
+ this.opCode = opCode;
+ this.opId = opId;
}
/**
- * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
- *
- * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
- * the output {@link org.apache.samza.task.MessageCollector} and the
- * {@link org.apache.samza.task.TaskCoordinator}.
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
- * or {@link OpCode#PARTITION_BY}
- * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
- * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec}
+ * Constructs a {@link SinkOperatorSpec} to send messages to the provided {@code outStream}
+ * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+ * It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
+ * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
*/
- SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
- this.sinkFn = sinkFn;
- this.opCode = opCode;
- this.opId = opId;
- this.outStream = outStream;
+ SinkOperatorSpec(OutputStreamInternal<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
+ this(createSinkFn(outputStream), opCode, opId);
+ this.outputStream = outputStream;
}
/**
@@ -94,23 +79,47 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
return null;
}
+ /**
+ * The {@link OutputStreamInternal} that this operator is sending its output to.
+ * @return the {@link OutputStreamInternal} for this operator if any, else null.
+ */
+ public OutputStreamInternal<?, ?, M> getOutputStream() {
+ return this.outputStream;
+ }
+
public SinkFunction<M> getSinkFn() {
return this.sinkFn;
}
+ @Override
public OperatorSpec.OpCode getOpCode() {
return this.opCode;
}
+ @Override
public int getOpId() {
return this.opId;
}
- public OutputStream<M> getOutStream() {
- return this.outStream;
+ @Override
+ public void init(Config config, TaskContext context) {
+ this.sinkFn.init(config, context);
}
- @Override public void init(Config config, TaskContext context) {
- this.sinkFn.init(config, context);
+ /**
+ * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
+ * @param outputStream the {@link OutputStreamInternal} to send messages to
+ * @param <M> the type of input message
+ * @return a {@link SinkFunction} that sends messages to the provided {@code output}
+ */
+ private static <M> SinkFunction<M> createSinkFn(OutputStreamInternal<?, ?, M> outputStream) {
+ return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+ // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
+ SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+ outputStream.getStreamSpec().getPhysicalName());
+ Object key = outputStream.getKeyExtractor().apply(message);
+ Object msg = outputStream.getMsgExtractor().apply(message);
+ mc.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+ };
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index d7813f7..3c427c7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -32,54 +32,42 @@ import org.apache.samza.task.TaskContext;
*/
public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
- /**
- * {@link OpCode} for this {@link StreamOperatorSpec}
- */
+ private final FlatMapFunction<M, OM> transformFn;
+ private final MessageStreamImpl<OM> nextStream;
private final OperatorSpec.OpCode opCode;
-
- /**
- * The unique ID for this operator.
- */
private final int opId;
/**
- * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
- */
- private final MessageStreamImpl<OM> outputStream;
-
- /**
- * Transformation function applied in this {@link StreamOperatorSpec}
- */
- private final FlatMapFunction<M, OM> transformFn;
-
- /**
* Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
*
* @param transformFn the transformation function
- * @param outputStream the output {@link MessageStreamImpl}
+ * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
* @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
* @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
*/
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
- this.outputStream = outputStream;
+ StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl nextStream,
+ OperatorSpec.OpCode opCode, int opId) {
this.transformFn = transformFn;
+ this.nextStream = nextStream;
this.opCode = opCode;
this.opId = opId;
}
@Override
public MessageStreamImpl<OM> getNextStream() {
- return this.outputStream;
+ return this.nextStream;
}
public FlatMapFunction<M, OM> getTransformFn() {
return this.transformFn;
}
+ @Override
public OperatorSpec.OpCode getOpCode() {
return this.opCode;
}
+ @Override
public int getOpId() {
return this.opId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 6d948d7..9515e38 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -36,21 +36,18 @@ import org.apache.samza.task.TaskContext;
public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
private final WindowInternal<M, WK, WV> window;
-
- private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
-
+ private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
private final int opId;
-
/**
* Constructor for {@link WindowOperatorSpec}.
*
* @param window the window function
- * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+ * @param nextStream the output {@link MessageStreamImpl} containing the messages produced from this operator
* @param opId auto-generated unique ID of this operator
*/
- WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
- this.outputStream = outputStream;
+ WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+ this.nextStream = nextStream;
this.window = window;
this.opId = opId;
}
@@ -64,17 +61,19 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
@Override
public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
- return this.outputStream;
+ return this.nextStream;
}
public WindowInternal<M, WK, WV> getWindow() {
return window;
}
+ @Override
public OpCode getOpCode() {
return OpCode.WINDOW;
}
+ @Override
public int getOpId() {
return this.opId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
new file mode 100644
index 0000000..e67b326
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+/**
+ * Internal representation of an input stream.
+ *
+ * @param <M> the type of messages in the input stream
+ */
+@InterfaceStability.Unstable
+public interface InputStreamInternal<K, V, M> extends MessageStream<M> {
+
+ StreamSpec getStreamSpec();
+
+ BiFunction<K, V, M> getMsgBuilder();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
new file mode 100644
index 0000000..c4337d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+public class InputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements InputStreamInternal<K, V, M> {
+
+ private final StreamSpec streamSpec;
+ private final BiFunction<K, V, M> msgBuilder;
+
+ public InputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder) {
+ super(graph);
+ this.streamSpec = streamSpec;
+ this.msgBuilder = msgBuilder;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return this.streamSpec;
+ }
+
+ public BiFunction<K, V, M> getMsgBuilder() {
+ return this.msgBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
new file mode 100644
index 0000000..a1bee6a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M>
+ implements InputStreamInternal<K, V, M>, OutputStreamInternal<K, V, M> {
+
+ private final StreamSpec streamSpec;
+ private final Function<M, K> keyExtractor;
+ private final Function<M, V> msgExtractor;
+ private final BiFunction<K, V, M> msgBuilder;
+
+ public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+ Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
+ super(graph);
+ this.streamSpec = streamSpec;
+ this.keyExtractor = keyExtractor;
+ this.msgExtractor = msgExtractor;
+ this.msgBuilder = msgBuilder;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return this.streamSpec;
+ }
+
+ public Function<M, K> getKeyExtractor() {
+ return this.keyExtractor;
+ }
+
+ public Function<M, V> getMsgExtractor() {
+ return this.msgExtractor;
+ }
+
+ @Override
+ public BiFunction<K, V, M> getMsgBuilder() {
+ return this.msgBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
new file mode 100644
index 0000000..48ce641
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+
+/**
+ * Internal representation of an output stream.
+ *
+ * @param <M> the type of messages in the output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStreamInternal<K, V, M> extends OutputStream<K, V, M> {
+
+ StreamSpec getStreamSpec();
+
+ Function<M, K> getKeyExtractor();
+
+ Function<M, V> getMsgExtractor();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
new file mode 100644
index 0000000..a2d0cca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+public class OutputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStreamInternal<K, V, M> {
+
+ private final StreamSpec streamSpec;
+ private final Function<M, K> keyExtractor;
+ private final Function<M, V> msgExtractor;
+
+ public OutputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+ Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+ super(graph);
+ this.streamSpec = streamSpec;
+ this.keyExtractor = keyExtractor;
+ this.msgExtractor = msgExtractor;
+ }
+
+ public StreamSpec getStreamSpec() {
+ return this.streamSpec;
+ }
+
+ public Function<M, K> getKeyExtractor() {
+ return this.keyExtractor;
+ }
+
+ public Function<M, V> getMsgExtractor() {
+ return this.msgExtractor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index cd6c492..4cb0d28 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -31,7 +31,6 @@ import org.apache.samza.execution.JobNode;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.JobRunner;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.system.StreamSpec;
import org.slf4j.Logger;
@@ -135,7 +134,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
// build stream graph
- StreamGraph streamGraph = new StreamGraphImpl(this, config);
+ StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
app.init(streamGraph, config);
// create the physical execution plan
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index d4224c3..73bb53f 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,113 +18,114 @@
*/
package org.apache.samza.task;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.application.StreamApplication;
import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.operators.stream.InputStreamInternal;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- * Execution of the logic sub-DAG
- *
- *
- * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamGraphImpl} using the
- * {@link org.apache.samza.operators.MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link InputMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented factory {@link StreamApplication} as input parameter of the constructor.
- * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamApplication}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
- * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
- * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
- * <p>
- * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
- * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
- * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ * A {@link StreamTask} implementation that brings all the operator API implementation components together and
+ * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
*/
public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
- /**
- * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
- */
- private final OperatorGraph operatorGraph;
-
- private final StreamApplication graphBuilder;
-
+ private final StreamApplication streamApplication;
private final ApplicationRunner runner;
-
private final Clock clock;
+ private OperatorImplGraph operatorImplGraph;
private ContextManager contextManager;
+ private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
- private Set<SystemStreamPartition> systemStreamPartitions;
-
- public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) {
- this(graphBuilder, SystemClock.instance(), runner);
+ /**
+ * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
+ * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG
+ * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams
+ * @param clock the {@link Clock} to use for time-keeping
+ */
+ public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) {
+ this.streamApplication = streamApplication;
+ this.runner = runner;
+ this.clock = clock;
}
- // purely for testing.
- public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) {
- this.graphBuilder = graphBuilder;
- this.operatorGraph = new OperatorGraph(clock);
- this.clock = clock;
- this.runner = runner;
+ public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) {
+ this(application, runner, SystemClock.instance());
}
+ /**
+ * Initializes this task during startup.
+ * <p>
+ * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
+ * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
+ * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
+ *<p>
+ * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
+ * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
+ * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
+ *
+ * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
+ * @param context allows initializing and accessing contextual data of this StreamTask
+ * @throws Exception in case of initialization errors
+ */
@Override
public final void init(Config config, TaskContext context) throws Exception {
- // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
- StreamGraphImpl streamGraph = new StreamGraphImpl(this.runner, config);
- this.graphBuilder.init(streamGraph, config);
- // get the context manager of the {@link StreamGraph} and initialize the task-specific context
- this.contextManager = streamGraph.getContextManager();
- this.systemStreamPartitions = context.getSystemStreamPartitions();
+ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+ // initialize the user-implemented stream application.
+ this.streamApplication.init(streamGraph, config);
- Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
- systemStreamPartitions.forEach(ssp -> {
- if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
- // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
- inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream()));
- }
+ // get the user-implemented context manager and initialize the task-specific context.
+ this.contextManager = streamGraph.getContextManager();
+ TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
+
+ // create the operator impl DAG corresponding to the logical operator spec DAG
+ OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
+ operatorImplGraph.init(streamGraph, config, initializedTaskContext);
+ this.operatorImplGraph = operatorImplGraph;
+
+ // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
+ inputSystemStreamToInputStream = new HashMap<>();
+ streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
+ SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+ inputSystemStreamToInputStream.put(systemStream, inputStream);
});
- operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
}
+ /**
+ * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
+ * for the input {@link SystemStream}.
+ * <p>
+ * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
+ * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
+ *
+ * @param ime incoming message envelope to process
+ * @param collector the collector to send messages with
+ * @param coordinator the coordinator to request commits or shutdown
+ */
@Override
public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
- this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
- .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+ SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
+ InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
+ // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
+ operatorImplGraph.getRootOperator(systemStream)
+ .onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator);
}
@Override
public final void window(MessageCollector collector, TaskCoordinator coordinator) {
- systemStreamPartitions.forEach(ssp -> {
- this.operatorGraph.get(ssp.getSystemStream())
- .onTick(collector, coordinator);
- });
+ operatorImplGraph.getAllRootOperators()
+ .forEach(rootOperator -> rootOperator.onTick(collector, coordinator));
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
new file mode 100644
index 0000000..a09247a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example implementation of a task that splits its input into multiple output streams.
+ */
+public class BroadcastExample implements StreamApplication {
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<PageViewEvent> inputStream =
+ graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+ OutputStream<String, PageViewEvent, PageViewEvent> outputStream1 =
+ graph.getOutputStream("outputStream1", m -> m.key, m -> m);
+ OutputStream<String, PageViewEvent, PageViewEvent> outputStream2 =
+ graph.getOutputStream("outputStream2", m -> m.key, m -> m);
+ OutputStream<String, PageViewEvent, PageViewEvent> outputStream3 =
+ graph.getOutputStream("outputStream3", m -> m.key, m -> m);
+
+ inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
+ inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);
+ inputStream.filter(m -> m.key.equals("key3")).sendTo(outputStream3);
+ }
+
+ // local execution mode
+ public static void main(String[] args) throws Exception {
+ CommandLine cmdLine = new CommandLine();
+ Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new BroadcastExample());
+ }
+
+ class PageViewEvent {
+ String key;
+ long timestamp;
+
+ public PageViewEvent(String key, long timestamp) {
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 5898f1f..6b913c4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -18,55 +18,42 @@
*/
package org.apache.samza.example;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.samza.operators.*;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
/**
* Example code using {@link KeyValueStore} to implement event-time window
*/
public class KeyValueStoreExample implements StreamApplication {
- /**
- * used by remote application runner to launch the job in remote program. The remote program should follow the similar
- * invoking context as in local runner:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- * runner.run(new UserMainExample(), config)
- * }
- *
- */
@Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+ MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(
+ "pageViewEventStream", (k, v) -> (PageViewEvent) v);
+ OutputStream<String, StatsOutput, StatsOutput> pageViewEventPerMemberStream = graph.getOutputStream(
+ "pageViewEventPerMemberStream", statsOutput -> statsOutput.memberId, statsOutput -> statsOutput);
pageViewEvents.
- partitionBy(m -> m.getMessage().memberId).
+ partitionBy(m -> m.memberId).
flatMap(new MyStatsCounter()).
- sendTo(pageViewPerMemberCounters);
-
+ sendTo(pageViewEventPerMemberStream);
}
- // local program model
+ // local execution mode
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -88,8 +75,8 @@ public class KeyValueStoreExample implements StreamApplication {
@Override
public Collection<StatsOutput> apply(PageViewEvent message) {
List<StatsOutput> outputStats = new ArrayList<>();
- long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
- String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+ long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
+ String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
StatsWindowState curState = this.statsStore.get(wndKey);
curState.newCount++;
long curTimeMs = System.currentTimeMillis();
@@ -97,7 +84,7 @@ public class KeyValueStoreExample implements StreamApplication {
curState.timeAtLastOutput = curTimeMs;
curState.lastCount += curState.newCount;
curState.newCount = 0;
- outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+ outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount));
}
// update counter w/o generating output
this.statsStore.put(wndKey, curState);
@@ -110,11 +97,7 @@ public class KeyValueStoreExample implements StreamApplication {
}
}
- StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
- StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ class PageViewEvent {
String pageId;
String memberId;
long timestamp;
@@ -124,19 +107,9 @@ public class KeyValueStoreExample implements StreamApplication {
this.memberId = memberId;
this.timestamp = timestamp;
}
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
}
- class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+ class StatsOutput {
private String memberId;
private long timestamp;
private Integer count;
@@ -146,16 +119,5 @@ public class KeyValueStoreExample implements StreamApplication {
this.timestamp = timestamp;
this.count = count;
}
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public StatsOutput getMessage() {
- return this;
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
deleted file mode 100644
index 516daf5..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * Example {@link StreamApplication} code to test the API methods
- */
-public class NoContextStreamExample implements StreamApplication {
-
- StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
-
- StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
-
- StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
-
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
-
- @Override
- public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
- JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-
- @Override
- public String getFirstKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
- }
-
-
- /**
- * used by remote application runner to launch the job in remote program. The remote program should follow the similar
- * invoking context as in local:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- * runner.run(new NoContextStreamExample(), config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
- MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
- input1, null, null);
- MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
- input2, null, null);
- OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
- new StringSerde("UTF-8"), new JsonSerde<>());
-
- inputSource1.map(this::getInputMessage).
- join(inputSource2.map(this::getInputMessage), new MyJoinFunction(), Duration.ofMinutes(1)).
- sendTo(outStream);
-
- }
-
- // standalone local program model
- public static void main(String[] args) throws Exception {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
- localRunner.run(new NoContextStreamExample());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index a338f6b..80d0e16 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -18,17 +18,13 @@
*/
package org.apache.samza.example;
+import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.application.StreamApplication;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -38,29 +34,19 @@ import java.time.Duration;
*/
public class OrderShipmentJoinExample implements StreamApplication {
- /**
- * used by remote application runner to launch the job in remote program. The remote program should follow the similar
- * invoking context as in local runner:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- * runner.run(new UserMainExample(), config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
- orders.join(shipments, new MyJoinFunction(), Duration.ofMinutes(1)).sendTo(fulfilledOrders);
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<OrderRecord> orders = graph.getInputStream("orderStream", (k, m) -> (OrderRecord) m);
+ MessageStream<ShipmentRecord> shipments = graph.getInputStream("shipmentStream", (k, m) -> (ShipmentRecord) m);
+ OutputStream<String, FulFilledOrderRecord, FulFilledOrderRecord> joinedOrderShipmentStream =
+ graph.getOutputStream("joinedOrderShipmentStream", m -> m.orderId, m -> m);
+ orders
+ .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+ .sendTo(joinedOrderShipmentStream);
}
- // standalone local program model
+ // local execution mode
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -68,13 +54,24 @@ public class OrderShipmentJoinExample implements StreamApplication {
localRunner.run(new OrderShipmentJoinExample());
}
- StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
+ class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+ @Override
+ public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return new FulFilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+ }
- StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
+ @Override
+ public String getFirstKey(OrderRecord message) {
+ return message.orderId;
+ }
- StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
+ @Override
+ public String getSecondKey(ShipmentRecord message) {
+ return message.orderId;
+ }
+ }
- class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+ class OrderRecord {
String orderId;
long orderTimeMs;
@@ -82,19 +79,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
this.orderId = orderId;
this.orderTimeMs = timeMs;
}
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public OrderRecord getMessage() {
- return this;
- }
}
- class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+ class ShipmentRecord {
String orderId;
long shipTimeMs;
@@ -102,19 +89,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
this.orderId = orderId;
this.shipTimeMs = timeMs;
}
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public ShipmentRecord getMessage() {
- return this;
- }
}
- class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+ class FulFilledOrderRecord {
String orderId;
long orderTimeMs;
long shipTimeMs;
@@ -124,38 +101,5 @@ public class OrderShipmentJoinExample implements StreamApplication {
this.orderTimeMs = orderTimeMs;
this.shipTimeMs = shipTimeMs;
}
-
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public FulFilledOrderRecord getMessage() {
- return this;
- }
- }
-
- FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
- return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
- }
-
- class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
-
- @Override
- public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
- return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
- }
-
- @Override
- public String getFirstKey(OrderRecord message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(ShipmentRecord message) {
- return message.getKey();
- }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 6edf048..547cac6 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -18,18 +18,17 @@
*/
package org.apache.samza.example;
-import org.apache.samza.operators.*;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -42,20 +41,22 @@ import java.util.function.Supplier;
public class PageViewCounterExample implements StreamApplication {
@Override public void init(StreamGraph graph, Config config) {
+ MessageStream<PageViewEvent> pageViewEvents =
+ graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+ OutputStream<String, PageViewCount, PageViewCount> pageViewEventPerMemberStream = graph
+ .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
Supplier<Integer> initialValue = () -> 0;
-
- pageViewEvents.
- window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1).
- setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
- setAccumulationMode(AccumulationMode.DISCARDING)).
- map(MyStreamOutput::new).
- sendTo(pageViewPerMemberCounters);
-
+ FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
+ pageViewEvents
+ .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn)
+ .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+ .setAccumulationMode(AccumulationMode.DISCARDING))
+ .map(PageViewCount::new)
+ .sendTo(pageViewEventPerMemberStream);
}
+ // local execution mode
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -63,11 +64,7 @@ public class PageViewCounterExample implements StreamApplication {
localRunner.run(new PageViewCounterExample());
}
- StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
- StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ class PageViewEvent {
String pageId;
String memberId;
long timestamp;
@@ -77,38 +74,17 @@ public class PageViewCounterExample implements StreamApplication {
this.memberId = memberId;
this.timestamp = timestamp;
}
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
}
- class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ class PageViewCount {
String memberId;
long timestamp;
int count;
- MyStreamOutput(WindowPane<String, Integer> m) {
+ PageViewCount(WindowPane<String, Integer> m) {
this.memberId = m.getKey().getKey();
this.timestamp = Long.valueOf(m.getKey().getPaneId());
this.count = m.getMessage();
}
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public MyStreamOutput getMessage() {
- return this;
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index e222fe4..37375cd 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -18,16 +18,14 @@
*/
package org.apache.samza.example;
-import org.apache.samza.operators.*;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -39,47 +37,21 @@ import java.util.function.Supplier;
*/
public class RepartitionExample implements StreamApplication {
- /**
- * used by remote application runner to launch the job in remote program. The remote program should follow the similar
- * invoking context as in local runner:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- * runner.run(new UserMainExample(), config);
- * }
- *
- */
-
- /**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * remoteEnv.run(new UserMainExample(), config);
- * }
- *
- */
@Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
Supplier<Integer> initialValue = () -> 0;
-
- pageViewEvents.
- partitionBy(m -> m.getMessage().memberId).
- window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
- msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)).
- map(MyStreamOutput::new).
- sendTo(pageViewPerMemberCounters);
-
+ MessageStream<PageViewEvent> pageViewEvents =
+ graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+ OutputStream<String, MyStreamOutput, MyStreamOutput> pageViewEventPerMemberStream = graph
+ .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+
+ pageViewEvents
+ .partitionBy(m -> m.memberId)
+ .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1))
+ .map(MyStreamOutput::new)
+ .sendTo(pageViewEventPerMemberStream);
}
- // standalone local program model
+ // local execution mode
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -87,11 +59,7 @@ public class RepartitionExample implements StreamApplication {
localRunner.run(new RepartitionExample());
}
- StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
- StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+ class PageViewEvent {
String pageId;
String memberId;
long timestamp;
@@ -101,19 +69,9 @@ public class RepartitionExample implements StreamApplication {
this.memberId = memberId;
this.timestamp = timestamp;
}
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
}
- class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+ class MyStreamOutput {
String memberId;
long timestamp;
int count;
@@ -123,16 +81,5 @@ public class RepartitionExample implements StreamApplication {
this.timestamp = Long.valueOf(m.getKey().getPaneId());
this.count = m.getMessage();
}
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public MyStreamOutput getMessage() {
- return this;
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
deleted file mode 100644
index 6975955..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestBasicStreamGraphs {
-
- private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
- for (int i = 0; i < 4; i++) {
- this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
- }
- } };
-
- private final ApplicationRunner runner = mock(ApplicationRunner.class);
-
- @Test
- public void testUserTask() throws Exception {
- Config config = new MapConfig();
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask, runner);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(config, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
- @Test
- public void testSplitTask() throws Exception {
- Config config = new MapConfig();
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask, runner);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(config, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
- @Test
- public void testJoinTask() throws Exception {
- Config config = new MapConfig();
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask, runner);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(config, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
-}