You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 23:04:25 UTC

[2/5] samza git commit: SAMZA-1073: moving all operator classes into samza-core

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
deleted file mode 100644
index e057c2b..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * Spec for the partial join operator that takes messages from one input stream, joins with buffered
- * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
- *
- * @param <M>  the type of input message
- * @param <K>  the type of join key
- * @param <JM>  the type of message in the other join stream
- * @param <RM>  the type of message in the join output stream
- */
-public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> {
-
-  private final MessageStreamImpl<RM> joinOutput;
-
-  /**
-   * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of
-   * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream,
-   * and generates a joined result message of type {@code RM}.
-   */
-  private final PartialJoinFunction<K, M, JM, RM> transformFn;
-
-
-  /**
-   * The unique ID for this operator.
-   */
-  private final int opId;
-
-  /**
-   * Default constructor for a {@link PartialJoinOperatorSpec}.
-   *
-   * @param partialJoinFn  partial join function that take type {@code M} of input message and join
-   *                       w/ type {@code JM} of buffered message from another stream
-   * @param joinOutput  the output {@link MessageStreamImpl} of the join results
-   */
-  PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) {
-    this.joinOutput = joinOutput;
-    this.transformFn = partialJoinFn;
-    this.opId = opId;
-  }
-
-  @Override
-  public MessageStreamImpl<RM> getNextStream() {
-    return this.joinOutput;
-  }
-
-  public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
-    return this.transformFn;
-  }
-
-  public OperatorSpec.OpCode getOpCode() {
-    return OpCode.JOIN;
-  }
-
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override public void init(Config config, TaskContext context) {
-    this.transformFn.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
deleted file mode 100644
index ba30d67..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
- * system. This is a terminal operator and does allows further operator chaining.
- *
- * @param <M>  the type of input message
- */
-public class SinkOperatorSpec<M> implements OperatorSpec {
-
-  /**
-   * {@link OpCode} for this {@link SinkOperatorSpec}
-   */
-  private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
-  private final int opId;
-
-  /**
-   * The user-defined sink function
-   */
-  private final SinkFunction<M> sinkFn;
-
-  /**
-   * Potential output stream defined by the {@link SinkFunction}
-   */
-  private final OutputStream<M> outStream;
-
-  /**
-   * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
-   *
-   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
-   *                the output {@link org.apache.samza.task.MessageCollector} and the
-   *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
-   */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
-    this(sinkFn, opCode, opId, null);
-  }
-
-  /**
-   * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
-   *
-   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
-   *                the output {@link org.apache.samza.task.MessageCollector} and the
-   *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
-   * @param opId  the {@link OutputStream} for this {@link SinkOperatorSpec}
-   */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
-    this.sinkFn = sinkFn;
-    this.opCode = opCode;
-    this.opId = opId;
-    this.outStream = outStream;
-  }
-
-  /**
-   * This is a terminal operator and doesn't allow further operator chaining.
-   * @return  null
-   */
-  @Override
-  public MessageStreamImpl<M> getNextStream() {
-    return null;
-  }
-
-  public SinkFunction<M> getSinkFn() {
-    return this.sinkFn;
-  }
-
-  public OperatorSpec.OpCode getOpCode() {
-    return this.opCode;
-  }
-
-  public int getOpId() {
-    return this.opId;
-  }
-
-  public OutputStream<M> getOutStream() {
-    return this.outStream;
-  }
-
-  @Override public void init(Config config, TaskContext context) {
-    this.sinkFn.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
deleted file mode 100644
index d7813f7..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The spec for a linear stream operator that outputs 0 or more messages for each input message.
- *
- * @param <M>  the type of input message
- * @param <OM>  the type of output message
- */
-public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
-
-  /**
-   * {@link OpCode} for this {@link StreamOperatorSpec}
-   */
-  private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
-  private final int opId;
-
-  /**
-   * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
-   */
-  private final MessageStreamImpl<OM> outputStream;
-
-  /**
-   * Transformation function applied in this {@link StreamOperatorSpec}
-   */
-  private final FlatMapFunction<M, OM> transformFn;
-
-  /**
-   * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
-   *
-   * @param transformFn  the transformation function
-   * @param outputStream  the output {@link MessageStreamImpl}
-   * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
-   * @param opId  the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
-   */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
-    this.outputStream = outputStream;
-    this.transformFn = transformFn;
-    this.opCode = opCode;
-    this.opId = opId;
-  }
-
-  @Override
-  public MessageStreamImpl<OM> getNextStream() {
-    return this.outputStream;
-  }
-
-  public FlatMapFunction<M, OM> getTransformFn() {
-    return this.transformFn;
-  }
-
-  public OperatorSpec.OpCode getOpCode() {
-    return this.opCode;
-  }
-
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) {
-    this.transformFn.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
deleted file mode 100644
index 46417ed..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-
-
-/**
- * Default window operator spec object
- *
- * @param <M>  the type of input message to the window
- * @param <WK>  the type of key of the window
- * @param <WV>  the type of aggregated value in the window output {@link WindowPane}
- */
-public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
-
-  private final WindowInternal<M, WK, WV> window;
-
-  private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
-
-  private final int opId;
-
-
-  /**
-   * Constructor for {@link WindowOperatorSpec}.
-   *
-   * @param window  the window function
-   * @param outputStream  the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
-   * @param opId  auto-generated unique ID of this operator
-   */
-  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
-    this.outputStream = outputStream;
-    this.window = window;
-    this.opId = opId;
-  }
-
-  @Override
-  public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
-    return this.outputStream;
-  }
-
-  public WindowInternal getWindow() {
-    return window;
-  }
-
-  public OpCode getOpCode() {
-    return OpCode.WINDOW;
-  }
-
-  public int getOpId() {
-    return this.opId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
deleted file mode 100644
index 53bca2e..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing this interface class.
- *
- * @param <WV>  the type for window output value
- */
-@InterfaceStability.Unstable
-public interface WindowState<WV> {
-  /**
-   * Method to get the system time when the first message in the window is received
-   *
-   * @return  nano-second of system time for the first message received in the window
-   */
-  long getFirstMessageTimeNs();
-
-  /**
-   * Method to get the system time when the last message in the window is received
-   *
-   * @return  nano-second of system time for the last message received in the window
-   */
-  long getLastMessageTimeNs();
-
-  /**
-   * Method to get the earliest event time in the window
-   *
-   * @return  the earliest event time in nano-second in the window
-   */
-  long getEarliestEventTimeNs();
-
-  /**
-   * Method to get the latest event time in the window
-   *
-   * @return  the latest event time in nano-second in the window
-   */
-  long getLatestEventTimeNs();
-
-  /**
-   * Method to get the total number of messages received in the window
-   *
-   * @return  number of messages in the window
-   */
-  long getNumberMessages();
-
-  /**
-   * Method to get the corresponding window's output value
-   *
-   * @return  the corresponding window's output value
-   */
-  WV getOutputValue();
-
-  /**
-   * Method to set the corresponding window's output value
-   *
-   * @param value  the corresponding window's output value
-   */
-  void setOutputValue(WV value);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
deleted file mode 100644
index fafa2cb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
- */
-public class RemoteExecutionEnvironment implements ExecutionEnvironment {
-
-  @Override public void run(StreamGraphBuilder app, Config config) {
-    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
-    // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
deleted file mode 100644
index f0f6ef2..0000000
--- a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraphImpl;
-
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
- */
-public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
-
-  // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
-  StreamGraph createGraph(StreamGraphBuilder app, Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
-    app.init(graph, config);
-    return graph;
-  }
-
-  @Override public void run(StreamGraphBuilder app, Config config) {
-    // 1. get logic graph for optimization
-    // StreamGraph logicGraph = this.createGraph(app, config);
-    // 2. potential optimization....
-    // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
-    // 4. create all input/output/intermediate topics
-    // 5. create the configuration for StreamProcessor
-    // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
deleted file mode 100644
index b007e3c..0000000
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Execution of the logic sub-DAG
- *
- *
- * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamGraphImpl} using the
- * {@link org.apache.samza.operators.MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link InputMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
- * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
- * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
- * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
- * <p>
- * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
- * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
- * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
- */
-public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
-
-  /**
-   * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
-   */
-  private final OperatorGraph operatorGraph = new OperatorGraph();
-
-  private final StreamGraphBuilder graphBuilder;
-
-  private ContextManager contextManager;
-
-  public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
-    this.graphBuilder = graphBuilder;
-  }
-
-  @Override
-  public final void init(Config config, TaskContext context) throws Exception {
-    // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streams = new StreamGraphImpl();
-    this.graphBuilder.init(streams, config);
-    // get the context manager of the {@link StreamGraph} and initialize the task-specific context
-    this.contextManager = streams.getContextManager();
-
-    Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
-    context.getSystemStreamPartitions().forEach(ssp -> {
-        if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
-          // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
-          inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
-        }
-      });
-    operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
-  }
-
-  @Override
-  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
-    this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
-        .onNext(new InputMessageEnvelope(ime), collector, coordinator);
-  }
-
-  @Override
-  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // TODO: invoke timer based triggers
-  }
-
-  @Override
-  public void close() throws Exception {
-    this.contextManager.finalizeTaskContext();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
deleted file mode 100644
index 85ebc6c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.util.CommandLine;
-
-import java.util.Properties;
-
-
-/**
- * Example code using {@link KeyValueStore} to implement event-time window
- */
-public class KeyValueStoreExample implements StreamGraphBuilder {
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     UserMainExample runnableApp = new UserMainExample();
-   *     runnableApp.run(remoteEnv, config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
-
-    pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
-        flatMap(new MyStatsCounter()).
-        sendTo(pageViewPerMemberCounters);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new KeyValueStoreExample(), config);
-  }
-
-  class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
-    private final int timeoutMs = 10 * 60 * 1000;
-
-    KeyValueStore<String, StatsWindowState> statsStore;
-
-    class StatsWindowState {
-      int lastCount = 0;
-      long timeAtLastOutput = 0;
-      int newCount = 0;
-    }
-
-    @Override
-    public Collection<StatsOutput> apply(PageViewEvent message) {
-      List<StatsOutput> outputStats = new ArrayList<>();
-      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
-      String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
-      StatsWindowState curState = this.statsStore.get(wndKey);
-      curState.newCount++;
-      long curTimeMs = System.currentTimeMillis();
-      if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
-        curState.timeAtLastOutput = curTimeMs;
-        curState.lastCount += curState.newCount;
-        curState.newCount = 0;
-        outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
-      }
-      // update counter w/o generating output
-      this.statsStore.put(wndKey, curState);
-      return outputStats;
-    }
-
-    @Override
-    public void init(Config config, TaskContext context) {
-      this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
-    }
-  }
-
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
-  }
-
-  class StatsOutput implements MessageEnvelope<String, StatsOutput> {
-    private String memberId;
-    private long timestamp;
-    private Integer count;
-
-    StatsOutput(String key, long timestamp, Integer count) {
-      this.memberId = key;
-      this.timestamp = timestamp;
-      this.count = count;
-    }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public StatsOutput getMessage() {
-      return this;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
deleted file mode 100644
index c6d2e6e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CommandLine;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Example {@link StreamGraphBuilder} code to test the API methods
- */
-public class NoContextStreamExample implements StreamGraphBuilder {
-
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input1");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input2");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "output");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
-
-    @Override
-    public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
-        JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
-   *     remoteEnv.run(new NoContextStreamExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input1, null, null);
-    MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input2, null, null);
-    OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
-        new StringSerde("UTF-8"), new JsonSerde<>());
-
-    inputSource1.map(this::getInputMessage).
-        join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
-        sendTo(outStream);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new NoContextStreamExample(), config);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
deleted file mode 100644
index 0477066..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.util.Properties;
-
-
-/**
- * Simple 2-way stream-to-stream join example
- */
-public class OrderShipmentJoinExample implements StreamGraphBuilder {
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     UserMainExample runnableApp = new UserMainExample();
-   *     runnableApp.run(remoteEnv, config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
-    orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new OrderShipmentJoinExample(), config);
-  }
-
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Orders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Shipment");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "FulfilledOrders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  class OrderRecord implements MessageEnvelope<String, OrderRecord> {
-    String orderId;
-    long orderTimeMs;
-
-    OrderRecord(String orderId, long timeMs) {
-      this.orderId = orderId;
-      this.orderTimeMs = timeMs;
-    }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public OrderRecord getMessage() {
-      return this;
-    }
-  }
-
-  class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
-    String orderId;
-    long shipTimeMs;
-
-    ShipmentRecord(String orderId, long timeMs) {
-      this.orderId = orderId;
-      this.shipTimeMs = timeMs;
-    }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public ShipmentRecord getMessage() {
-      return this;
-    }
-  }
-
-  class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
-    String orderId;
-    long orderTimeMs;
-    long shipTimeMs;
-
-    FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
-      this.orderId = orderId;
-      this.orderTimeMs = orderTimeMs;
-      this.shipTimeMs = shipTimeMs;
-    }
-
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public FulFilledOrderRecord getMessage() {
-      return this;
-    }
-  }
-
-  FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
-    return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
-  }
-
-  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
-
-    @Override
-    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
-      return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(OrderRecord message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(ShipmentRecord message) {
-      return message.getKey();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
deleted file mode 100644
index f7d8bda..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.Properties;
-
-
-/**
- * Example code to implement window-based counter
- */
-public class PageViewCounterExample implements StreamGraphBuilder {
-
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
-    pageViewEvents.
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
-            setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
-            setAccumulationMode(AccumulationMode.DISCARDING)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
-  }
-
-  public static void main(String[] args) {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new PageViewCounterExample(), config);
-  }
-
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
-  }
-
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
-    String memberId;
-    long timestamp;
-    int count;
-
-    MyStreamOutput(WindowPane<String, Integer> m) {
-      this.memberId = m.getKey().getKey();
-      this.timestamp = Long.valueOf(m.getKey().getPaneId());
-      this.count = m.getMessage();
-    }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
deleted file mode 100644
index 6994ac4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.*;
-
-
-/**
- * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
- */
-public class RepartitionExample implements StreamGraphBuilder {
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     remoteEnv.run(new UserMainExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
-    pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
-            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new RepartitionExample(), config);
-  }
-
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
-  }
-
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
-    String memberId;
-    long timestamp;
-    int count;
-
-    MyStreamOutput(WindowPane<String, Integer> m) {
-      this.memberId = m.getKey().getKey();
-      this.timestamp = Long.valueOf(m.getKey().getPaneId());
-      this.count = m.getMessage();
-    }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
deleted file mode 100644
index 8ecd44f..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestBasicStreamGraphs {
-
-  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
-      }
-    } };
-
-  @Test
-  public void testUserTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testSplitTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testJoinTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
deleted file mode 100644
index d22324b..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class TestBroadcastExample extends TestExampleBase {
-
-  TestBroadcastExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
-    inputs.keySet().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-          @Override public SystemStream getSystemStream() {
-            return entry;
-          }
-
-          @Override public Properties getProperties() {
-            return null;
-          }
-        }, null, null).map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-      });
-  }
-
-  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
-    return (JsonMessageEnvelope) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessageEnvelope m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
deleted file mode 100644
index c4df9d4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for test examples
- *
- */
-public abstract class TestExampleBase implements StreamGraphBuilder {
-
-  protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
-
-  TestExampleBase(Set<SystemStreamPartition> inputs) {
-    this.inputs = new HashMap<>();
-    for (SystemStreamPartition input : inputs) {
-      this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
-      this.inputs.get(input.getSystemStream()).add(input);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
deleted file mode 100644
index fe6e7e7..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class TestJoinExample  extends TestExampleBase {
-
-  TestJoinExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  MessageStream<JsonMessageEnvelope> joinOutput = null;
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-
-    for (SystemStream input : inputs.keySet()) {
-      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          new StreamSpec() {
-            @Override public SystemStream getSystemStream() {
-              return input;
-            }
-
-            @Override public Properties getProperties() {
-              return null;
-            }
-          }, null, null).map(this::getInputMessage);
-      if (joinOutput == null) {
-        joinOutput = newSource;
-      } else {
-        joinOutput = joinOutput.join(newSource, new MyJoinFunction());
-      }
-    }
-
-    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return null;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, new StringSerde("UTF-8"), new JsonSerde<>()));
-
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
-    JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
-      return this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
deleted file mode 100644
index e08ca20..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class TestWindowExample extends TestExampleBase {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  TestWindowExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
-    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return source;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, null, null).
-        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
-            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
-
-  }
-
-  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
deleted file mode 100644
index 160a47a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestMessageStreamImpl {
-
-  private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
-  @Test
-  public void testMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m)  ->
-        new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperatorSpec);
-    assertEquals(mapOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
-    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
-    Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
-  }
-
-  @Test
-  public void testFlatMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-      } };
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
-  }
-
-  @Test
-  public void testFilter() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
-    MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperatorSpec);
-    assertEquals(filterOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
-    Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
-  }
-
-  @Test
-  public void testSink() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
-      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperatorSpec);
-    assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-    assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
-  }
-
-  @Test
-  public void testJoin() {
-    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
-    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
-      new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-        @Override
-        public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-          return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-        }
-
-        @Override
-        public String getFirstKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-
-        @Override
-        public String getSecondKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-      };
-
-    MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
-    Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
-    subs = source2.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
-    TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
-    TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
-    TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-  }
-
-  @Test
-  public void testMerge() {
-    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
-        this.add(new MessageStreamImpl<>(mockGraph));
-        this.add(new MessageStreamImpl<>(mockGraph));
-      } };
-    MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
-
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
-  }
-
-  private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
-    Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperatorSpec);
-    assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-}