You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 23:04:25 UTC
[2/5] samza git commit: SAMZA-1073: moving all operator classes into
samza-core
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
deleted file mode 100644
index e057c2b..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * Spec for the partial join operator that takes messages from one input stream, joins with buffered
- * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
- *
- * @param <M> the type of input message
- * @param <K> the type of join key
- * @param <JM> the type of message in the other join stream
- * @param <RM> the type of message in the join output stream
- */
-public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> {
-
- private final MessageStreamImpl<RM> joinOutput;
-
- /**
- * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of
- * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream,
- * and generates a joined result message of type {@code RM}.
- */
- private final PartialJoinFunction<K, M, JM, RM> transformFn;
-
-
- /**
- * The unique ID for this operator.
- */
- private final int opId;
-
- /**
- * Default constructor for a {@link PartialJoinOperatorSpec}.
- *
- * @param partialJoinFn partial join function that take type {@code M} of input message and join
- * w/ type {@code JM} of buffered message from another stream
- * @param joinOutput the output {@link MessageStreamImpl} of the join results
- */
- PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) {
- this.joinOutput = joinOutput;
- this.transformFn = partialJoinFn;
- this.opId = opId;
- }
-
- @Override
- public MessageStreamImpl<RM> getNextStream() {
- return this.joinOutput;
- }
-
- public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
- return this.transformFn;
- }
-
- public OperatorSpec.OpCode getOpCode() {
- return OpCode.JOIN;
- }
-
- public int getOpId() {
- return this.opId;
- }
-
- @Override public void init(Config config, TaskContext context) {
- this.transformFn.init(config, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
deleted file mode 100644
index ba30d67..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
- * system. This is a terminal operator and does allows further operator chaining.
- *
- * @param <M> the type of input message
- */
-public class SinkOperatorSpec<M> implements OperatorSpec {
-
- /**
- * {@link OpCode} for this {@link SinkOperatorSpec}
- */
- private final OperatorSpec.OpCode opCode;
-
- /**
- * The unique ID for this operator.
- */
- private final int opId;
-
- /**
- * The user-defined sink function
- */
- private final SinkFunction<M> sinkFn;
-
- /**
- * Potential output stream defined by the {@link SinkFunction}
- */
- private final OutputStream<M> outStream;
-
- /**
- * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
- *
- * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
- * the output {@link org.apache.samza.task.MessageCollector} and the
- * {@link org.apache.samza.task.TaskCoordinator}.
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
- * or {@link OpCode#PARTITION_BY}
- * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
- */
- SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
- this(sinkFn, opCode, opId, null);
- }
-
- /**
- * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
- *
- * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message,
- * the output {@link org.apache.samza.task.MessageCollector} and the
- * {@link org.apache.samza.task.TaskCoordinator}.
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
- * or {@link OpCode#PARTITION_BY}
- * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
- * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec}
- */
- SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
- this.sinkFn = sinkFn;
- this.opCode = opCode;
- this.opId = opId;
- this.outStream = outStream;
- }
-
- /**
- * This is a terminal operator and doesn't allow further operator chaining.
- * @return null
- */
- @Override
- public MessageStreamImpl<M> getNextStream() {
- return null;
- }
-
- public SinkFunction<M> getSinkFn() {
- return this.sinkFn;
- }
-
- public OperatorSpec.OpCode getOpCode() {
- return this.opCode;
- }
-
- public int getOpId() {
- return this.opId;
- }
-
- public OutputStream<M> getOutStream() {
- return this.outStream;
- }
-
- @Override public void init(Config config, TaskContext context) {
- this.sinkFn.init(config, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
deleted file mode 100644
index d7813f7..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The spec for a linear stream operator that outputs 0 or more messages for each input message.
- *
- * @param <M> the type of input message
- * @param <OM> the type of output message
- */
-public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
-
- /**
- * {@link OpCode} for this {@link StreamOperatorSpec}
- */
- private final OperatorSpec.OpCode opCode;
-
- /**
- * The unique ID for this operator.
- */
- private final int opId;
-
- /**
- * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
- */
- private final MessageStreamImpl<OM> outputStream;
-
- /**
- * Transformation function applied in this {@link StreamOperatorSpec}
- */
- private final FlatMapFunction<M, OM> transformFn;
-
- /**
- * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
- *
- * @param transformFn the transformation function
- * @param outputStream the output {@link MessageStreamImpl}
- * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec}
- * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
- */
- StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
- this.outputStream = outputStream;
- this.transformFn = transformFn;
- this.opCode = opCode;
- this.opId = opId;
- }
-
- @Override
- public MessageStreamImpl<OM> getNextStream() {
- return this.outputStream;
- }
-
- public FlatMapFunction<M, OM> getTransformFn() {
- return this.transformFn;
- }
-
- public OperatorSpec.OpCode getOpCode() {
- return this.opCode;
- }
-
- public int getOpId() {
- return this.opId;
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- this.transformFn.init(config, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
deleted file mode 100644
index 46417ed..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-
-
-/**
- * Default window operator spec object
- *
- * @param <M> the type of input message to the window
- * @param <WK> the type of key of the window
- * @param <WV> the type of aggregated value in the window output {@link WindowPane}
- */
-public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
-
- private final WindowInternal<M, WK, WV> window;
-
- private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
-
- private final int opId;
-
-
- /**
- * Constructor for {@link WindowOperatorSpec}.
- *
- * @param window the window function
- * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
- * @param opId auto-generated unique ID of this operator
- */
- WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
- this.outputStream = outputStream;
- this.window = window;
- this.opId = opId;
- }
-
- @Override
- public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
- return this.outputStream;
- }
-
- public WindowInternal getWindow() {
- return window;
- }
-
- public OpCode getOpCode() {
- return OpCode.WINDOW;
- }
-
- public int getOpId() {
- return this.opId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
deleted file mode 100644
index 53bca2e..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing this interface class.
- *
- * @param <WV> the type for window output value
- */
-@InterfaceStability.Unstable
-public interface WindowState<WV> {
- /**
- * Method to get the system time when the first message in the window is received
- *
- * @return nano-second of system time for the first message received in the window
- */
- long getFirstMessageTimeNs();
-
- /**
- * Method to get the system time when the last message in the window is received
- *
- * @return nano-second of system time for the last message received in the window
- */
- long getLastMessageTimeNs();
-
- /**
- * Method to get the earliest event time in the window
- *
- * @return the earliest event time in nano-second in the window
- */
- long getEarliestEventTimeNs();
-
- /**
- * Method to get the latest event time in the window
- *
- * @return the latest event time in nano-second in the window
- */
- long getLatestEventTimeNs();
-
- /**
- * Method to get the total number of messages received in the window
- *
- * @return number of messages in the window
- */
- long getNumberMessages();
-
- /**
- * Method to get the corresponding window's output value
- *
- * @return the corresponding window's output value
- */
- WV getOutputValue();
-
- /**
- * Method to set the corresponding window's output value
- *
- * @param value the corresponding window's output value
- */
- void setOutputValue(WV value);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
deleted file mode 100644
index fafa2cb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
- */
-public class RemoteExecutionEnvironment implements ExecutionEnvironment {
-
- @Override public void run(StreamGraphBuilder app, Config config) {
- // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
- // TODO: actually instantiate the tasks and run the job, i.e.
- // 1. create all input/output/intermediate topics
- // 2. create the single job configuration
- // 3. execute JobRunner to submit the single job for the whole graph
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
deleted file mode 100644
index f0f6ef2..0000000
--- a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraphImpl;
-
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
- */
-public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
-
- // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
- StreamGraph createGraph(StreamGraphBuilder app, Config config) {
- StreamGraphImpl graph = new StreamGraphImpl();
- app.init(graph, config);
- return graph;
- }
-
- @Override public void run(StreamGraphBuilder app, Config config) {
- // 1. get logic graph for optimization
- // StreamGraph logicGraph = this.createGraph(app, config);
- // 2. potential optimization....
- // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
- // 4. create all input/output/intermediate topics
- // 5. create the configuration for StreamProcessor
- // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
deleted file mode 100644
index b007e3c..0000000
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Execution of the logic sub-DAG
- *
- *
- * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamGraphImpl} using the
- * {@link org.apache.samza.operators.MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link InputMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
- * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
- * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
- * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
- * <p>
- * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
- * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
- * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
- */
-public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
-
- /**
- * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
- */
- private final OperatorGraph operatorGraph = new OperatorGraph();
-
- private final StreamGraphBuilder graphBuilder;
-
- private ContextManager contextManager;
-
- public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
- this.graphBuilder = graphBuilder;
- }
-
- @Override
- public final void init(Config config, TaskContext context) throws Exception {
- // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
- StreamGraphImpl streams = new StreamGraphImpl();
- this.graphBuilder.init(streams, config);
- // get the context manager of the {@link StreamGraph} and initialize the task-specific context
- this.contextManager = streams.getContextManager();
-
- Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
- context.getSystemStreamPartitions().forEach(ssp -> {
- if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
- // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
- inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
- }
- });
- operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
- }
-
- @Override
- public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
- this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
- .onNext(new InputMessageEnvelope(ime), collector, coordinator);
- }
-
- @Override
- public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- // TODO: invoke timer based triggers
- }
-
- @Override
- public void close() throws Exception {
- this.contextManager.finalizeTaskContext();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
deleted file mode 100644
index 85ebc6c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.util.CommandLine;
-
-import java.util.Properties;
-
-
-/**
- * Example code using {@link KeyValueStore} to implement event-time window
- */
-public class KeyValueStoreExample implements StreamGraphBuilder {
-
- /**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * UserMainExample runnableApp = new UserMainExample();
- * runnableApp.run(remoteEnv, config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
-
- pageViewEvents.
- partitionBy(m -> m.getMessage().memberId).
- flatMap(new MyStatsCounter()).
- sendTo(pageViewPerMemberCounters);
-
- }
-
- // standalone local program model
- public static void main(String[] args) throws Exception {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new KeyValueStoreExample(), config);
- }
-
- class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
- private final int timeoutMs = 10 * 60 * 1000;
-
- KeyValueStore<String, StatsWindowState> statsStore;
-
- class StatsWindowState {
- int lastCount = 0;
- long timeAtLastOutput = 0;
- int newCount = 0;
- }
-
- @Override
- public Collection<StatsOutput> apply(PageViewEvent message) {
- List<StatsOutput> outputStats = new ArrayList<>();
- long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
- String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
- StatsWindowState curState = this.statsStore.get(wndKey);
- curState.newCount++;
- long curTimeMs = System.currentTimeMillis();
- if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
- curState.timeAtLastOutput = curTimeMs;
- curState.lastCount += curState.newCount;
- curState.newCount = 0;
- outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
- }
- // update counter w/o generating output
- this.statsStore.put(wndKey, curState);
- return outputStats;
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
- }
- }
-
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
- String pageId;
- String memberId;
- long timestamp;
-
- PageViewEvent(String pageId, String memberId, long timestamp) {
- this.pageId = pageId;
- this.memberId = memberId;
- this.timestamp = timestamp;
- }
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
- }
-
- class StatsOutput implements MessageEnvelope<String, StatsOutput> {
- private String memberId;
- private long timestamp;
- private Integer count;
-
- StatsOutput(String key, long timestamp, Integer count) {
- this.memberId = key;
- this.timestamp = timestamp;
- this.count = count;
- }
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public StatsOutput getMessage() {
- return this;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
deleted file mode 100644
index c6d2e6e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CommandLine;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Example {@link StreamGraphBuilder} code to test the API methods
- */
-public class NoContextStreamExample implements StreamGraphBuilder {
-
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "input1");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec input2 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "input2");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "output");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
-
- @Override
- public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
- JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-
- @Override
- public String getFirstKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
- }
-
- /**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
- * remoteEnv.run(new NoContextStreamExample(), config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
- MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
- input1, null, null);
- MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
- input2, null, null);
- OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
- new StringSerde("UTF-8"), new JsonSerde<>());
-
- inputSource1.map(this::getInputMessage).
- join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
- sendTo(outStream);
-
- }
-
- // standalone local program model
- public static void main(String[] args) throws Exception {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new NoContextStreamExample(), config);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
deleted file mode 100644
index 0477066..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.util.Properties;
-
-
-/**
- * Simple 2-way stream-to-stream join example
- */
-public class OrderShipmentJoinExample implements StreamGraphBuilder {
-
- /**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * UserMainExample runnableApp = new UserMainExample();
- * runnableApp.run(remoteEnv, config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
- orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
-
- }
-
- // standalone local program model
- public static void main(String[] args) throws Exception {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new OrderShipmentJoinExample(), config);
- }
-
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "Orders");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec input2 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "Shipment");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "FulfilledOrders");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- class OrderRecord implements MessageEnvelope<String, OrderRecord> {
- String orderId;
- long orderTimeMs;
-
- OrderRecord(String orderId, long timeMs) {
- this.orderId = orderId;
- this.orderTimeMs = timeMs;
- }
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public OrderRecord getMessage() {
- return this;
- }
- }
-
- class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
- String orderId;
- long shipTimeMs;
-
- ShipmentRecord(String orderId, long timeMs) {
- this.orderId = orderId;
- this.shipTimeMs = timeMs;
- }
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public ShipmentRecord getMessage() {
- return this;
- }
- }
-
- class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
- String orderId;
- long orderTimeMs;
- long shipTimeMs;
-
- FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
- this.orderId = orderId;
- this.orderTimeMs = orderTimeMs;
- this.shipTimeMs = shipTimeMs;
- }
-
-
- @Override
- public String getKey() {
- return this.orderId;
- }
-
- @Override
- public FulFilledOrderRecord getMessage() {
- return this;
- }
- }
-
- FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
- return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
- }
-
- class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
-
- @Override
- public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
- return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
- }
-
- @Override
- public String getFirstKey(OrderRecord message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(ShipmentRecord message) {
- return message.getKey();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
deleted file mode 100644
index f7d8bda..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.Properties;
-
-
-/**
- * Example code to implement window-based counter
- */
-public class PageViewCounterExample implements StreamGraphBuilder {
-
- @Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
- pageViewEvents.
- window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
- setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
- setAccumulationMode(AccumulationMode.DISCARDING)).
- map(MyStreamOutput::new).
- sendTo(pageViewPerMemberCounters);
-
- }
-
- public static void main(String[] args) {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new PageViewCounterExample(), config);
- }
-
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
- String pageId;
- String memberId;
- long timestamp;
-
- PageViewEvent(String pageId, String memberId, long timestamp) {
- this.pageId = pageId;
- this.memberId = memberId;
- this.timestamp = timestamp;
- }
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
- }
-
- class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
- String memberId;
- long timestamp;
- int count;
-
- MyStreamOutput(WindowPane<String, Integer> m) {
- this.memberId = m.getKey().getKey();
- this.timestamp = Long.valueOf(m.getKey().getPaneId());
- this.count = m.getMessage();
- }
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public MyStreamOutput getMessage() {
- return this;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
deleted file mode 100644
index 6994ac4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.*;
-
-
-/**
- * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
- */
-public class RepartitionExample implements StreamGraphBuilder {
-
- /**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
- *
- * public static void main(String args[]) throws Exception {
- * CommandLine cmdLine = new CommandLine();
- * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * remoteEnv.run(new UserMainExample(), config);
- * }
- *
- */
- @Override public void init(StreamGraph graph, Config config) {
-
- MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
- OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
- pageViewEvents.
- partitionBy(m -> m.getMessage().memberId).
- window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
- msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
- map(MyStreamOutput::new).
- sendTo(pageViewPerMemberCounters);
-
- }
-
- // standalone local program model
- public static void main(String[] args) throws Exception {
- CommandLine cmdLine = new CommandLine();
- Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new RepartitionExample(), config);
- }
-
- StreamSpec input1 = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewEvent");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- StreamSpec output = new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return new SystemStream("kafka", "PageViewPerMember5min");
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- };
-
- class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
- String pageId;
- String memberId;
- long timestamp;
-
- PageViewEvent(String pageId, String memberId, long timestamp) {
- this.pageId = pageId;
- this.memberId = memberId;
- this.timestamp = timestamp;
- }
-
- @Override
- public String getKey() {
- return this.pageId;
- }
-
- @Override
- public PageViewEvent getMessage() {
- return this;
- }
- }
-
- class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
- String memberId;
- long timestamp;
- int count;
-
- MyStreamOutput(WindowPane<String, Integer> m) {
- this.memberId = m.getKey().getKey();
- this.timestamp = Long.valueOf(m.getKey().getPaneId());
- this.count = m.getMessage();
- }
-
- @Override
- public String getKey() {
- return this.memberId;
- }
-
- @Override
- public MyStreamOutput getMessage() {
- return this;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
deleted file mode 100644
index 8ecd44f..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestBasicStreamGraphs {
-
- private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
- for (int i = 0; i < 4; i++) {
- this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
- }
- } };
-
- @Test
- public void testUserTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
- @Test
- public void testSplitTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
- @Test
- public void testJoinTask() throws Exception {
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
- TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
- StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
- Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
- pipelineMapFld.setAccessible(true);
- OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
- adaptorTask.init(mockConfig, mockContext);
- this.inputPartitions.forEach(partition -> {
- assertNotNull(opGraph.get(partition.getSystemStream()));
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
deleted file mode 100644
index d22324b..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class TestBroadcastExample extends TestExampleBase {
-
- TestBroadcastExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class MessageType {
- String field1;
- String field2;
- String field3;
- String field4;
- String parKey;
- private long timestamp;
-
- public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
- inputs.keySet().forEach(entry -> {
- MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return entry;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
- .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
- });
- }
-
- JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
- return (JsonMessageEnvelope) m1.getMessage();
- }
-
- boolean myFilter1(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key1");
- }
-
- boolean myFilter2(JsonMessageEnvelope m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key2");
- }
-
- boolean myFilter3(JsonMessageEnvelope m1) {
- return m1.getMessage().parKey.equals("key3");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
deleted file mode 100644
index c4df9d4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for test examples
- *
- */
-public abstract class TestExampleBase implements StreamGraphBuilder {
-
- protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
-
- TestExampleBase(Set<SystemStreamPartition> inputs) {
- this.inputs = new HashMap<>();
- for (SystemStreamPartition input : inputs) {
- this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
- this.inputs.get(input.getSystemStream()).add(input);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
deleted file mode 100644
index fe6e7e7..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class TestJoinExample extends TestExampleBase {
-
- TestJoinExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class MessageType {
- String joinKey;
- List<String> joinFields = new ArrayList<>();
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- MessageStream<JsonMessageEnvelope> joinOutput = null;
-
- @Override
- public void init(StreamGraph graph, Config config) {
-
- for (SystemStream input : inputs.keySet()) {
- MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
- new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return input;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).map(this::getInputMessage);
- if (joinOutput == null) {
- joinOutput = newSource;
- } else {
- joinOutput = joinOutput.join(newSource, new MyJoinFunction());
- }
- }
-
- joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return null;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, new StringSerde("UTF-8"), new JsonSerde<>()));
-
- }
-
- private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
- return new JsonMessageEnvelope(
- ((MessageType) ism.getMessage()).joinKey,
- (MessageType) ism.getMessage(),
- ism.getOffset(),
- ism.getSystemStreamPartition());
- }
-
- class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
- JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
- MessageType newJoinMsg = new MessageType();
- newJoinMsg.joinKey = m1.getKey();
- newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
- newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
- return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
- }
-
- @Override
- public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
- return this.myJoinResult(message, otherMessage);
- }
-
- @Override
- public String getFirstKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(JsonMessageEnvelope message) {
- return message.getKey();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
deleted file mode 100644
index e08ca20..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class TestWindowExample extends TestExampleBase {
- class MessageType {
- String field1;
- String field2;
- }
-
- TestWindowExample(Set<SystemStreamPartition> inputs) {
- super(inputs);
- }
-
- class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
- JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
- super(key, data, offset, partition);
- }
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
- inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
- @Override public SystemStream getSystemStream() {
- return source;
- }
-
- @Override public Properties getProperties() {
- return null;
- }
- }, null, null).
- map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
- m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
-
- }
-
- String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
- return m.getKey().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
deleted file mode 100644
index 160a47a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestMessageStreamImpl {
-
- private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
- @Test
- public void testMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
- new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
- assertTrue(mapOp instanceof StreamOperatorSpec);
- assertEquals(mapOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
- TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
- when(xTestMsg.getKey()).thenReturn("test-msg-key");
- when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
- Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
- assertEquals(cOutputMsg.size(), 1);
- TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
- assertEquals(outputMessage.getKey(), xTestMsg.getKey());
- assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
- }
-
- @Test
- public void testFlatMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- } };
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
- assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
- }
-
- @Test
- public void testFilter() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
- MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
- assertTrue(filterOp instanceof StreamOperatorSpec);
- assertEquals(filterOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
- Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
- assertTrue(output.isEmpty());
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
- output = txfmFn.apply(mockMsg);
- assertEquals(output.size(), 1);
- assertEquals(output.iterator().next(), mockMsg);
- }
-
- @Test
- public void testSink() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
- mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
- tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
- };
- inputStream.sink(xSink);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
- assertTrue(sinkOp instanceof SinkOperatorSpec);
- assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
- assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
- }
-
- @Test
- public void testJoin() {
- MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
- JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- };
-
- MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
- Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
- assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
- subs = source2.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
- assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
- TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
- TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
- TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
- xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
- }
-
- @Test
- public void testMerge() {
- MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
- Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
- this.add(new MessageStreamImpl<>(mockGraph));
- this.add(new MessageStreamImpl<>(mockGraph));
- } };
- MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
- validateMergeOperator(merge1, mergeOutput);
-
- others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
- }
-
- private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
- Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
- assertTrue(mergeOp instanceof StreamOperatorSpec);
- assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
- assertEquals(outputs.size(), 1);
- assertEquals(outputs.iterator().next(), mockMsg);
- }
-}