You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/01 22:51:05 UTC

[5/6] samza git commit: SAMZA-1054: Refactor Operator APIs

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
deleted file mode 100644
index f06387c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines all basic stream operator classes used by internal implementation only. All classes defined in
- * this file are immutable.
- *
- * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects
- * should be initiated via {@link MessageStream} API methods
- */
-public class Operators {
-  /**
-   * Private constructor to prevent instantiation of the {@link Operators} class
-   */
-  private Operators() {}
-
-  private static String getOperatorId() {
-    // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
-    return UUID.randomUUID().toString();
-  }
-
-  /**
-   * Private interface for stream operator functions. The interface class defines the output of the stream operator function.
-   *
-   */
-  public interface Operator<OM extends Message> {
-    MessageStream<OM> getOutputStream();
-  }
-
-  /**
-   * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s.
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <OM>  the type of output {@link Message}
-   */
-  public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<OM> outputStream;
-
-    /**
-     * The transformation function
-     */
-    private final Function<M, Collection<OM>> txfmFunction;
-
-    /**
-     * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param transformFn  the transformation function to be applied that transforms 1 input {@link Message} into a collection
-     *                     of output {@link Message}s
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn) {
-      this(transformFn, new MessageStream<>());
-    }
-
-    /**
-     * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream}
-     *
-     * @param transformFn  the transformation function
-     * @param outputStream  the output {@link MessageStream}
-     */
-    private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) {
-      this.outputStream = outputStream;
-      this.txfmFunction = transformFn;
-    }
-
-    @Override
-    public MessageStream<OM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the transformation function.
-     *
-     * @return the {@code txfmFunction}
-     */
-    public Function<M, Collection<OM>> getFunction() {
-      return this.txfmFunction;
-    }
-
-  }
-
-  /**
-   * A sink operator function that allows customized code to send the output to external system. This is the terminal
-   * operator that does not have any output {@link MessageStream} that allows further processing in the same
-   * {@link org.apache.samza.operators.task.StreamOperatorTask}
-   *
-   * @param <M>  the type of input {@link Message}
-   */
-  public static class SinkOperator<M extends Message> implements Operator {
-
-    /**
-     * The user-defined sink function
-     */
-    private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink;
-
-    /**
-     * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param sink  the user-defined sink function
-     */
-    private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public MessageStream getOutputStream() {
-      return null;
-    }
-
-    /**
-     * Method to get the user-defined function implements the {@link SinkOperator}
-     *
-     * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector}
-     *         and {@link TaskCoordinator} to the sink function
-     */
-    public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() {
-      return this.sink;
-    }
-  }
-
-  /**
-   * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve
-   * buffered messages and partial aggregation results
-   *
-   * @param <SK>  the type of key used to store the operator states
-   * @param <SS>  the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered
-   *             input message from the join stream for a join
-   */
-  public static class StoreFunctions<M extends Message, SK, SS> {
-    /**
-     * Function to define the key to query in the operator state store, according to the incoming {@link Message}
-     * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping
-     * windows and unique-key-based join.
-     *
-     * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query
-     * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input
-     * message to a range of keys in the store.
-     */
-    private final Function<M, SK> storeKeyFinder;
-
-    /**
-     * Function to update the store entry based on the current state and the incoming {@link Message}
-     *
-     * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping
-     * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the
-     * state value.
-     */
-    private final BiFunction<M, SS, SS> stateUpdater;
-
-    /**
-     * Constructor of state store functions.
-     *
-     */
-    private StoreFunctions(Function<M, SK> keyFinder,
-        BiFunction<M, SS, SS> stateUpdater) {
-      this.storeKeyFinder = keyFinder;
-      this.stateUpdater = stateUpdater;
-    }
-
-    /**
-     * Method to get the {@code storeKeyFinder} function
-     *
-     * @return  the function to calculate the key from an input {@link Message}
-     */
-    public Function<M, SK> getStoreKeyFinder() {
-      return this.storeKeyFinder;
-    }
-
-    /**
-     * Method to get the {@code stateUpdater} function
-     *
-     * @return  the function to update the corresponding state according to an input {@link Message}
-     */
-    public BiFunction<M, SS, SS> getStateUpdater() {
-      return this.stateUpdater;
-    }
-  }
-
-  /**
-   * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate
-   * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <WK>  the type of key in the output {@link Message} from the {@link WindowOperator} function
-   * @param <WS>  the type of window state in the {@link WindowOperator} function
-   * @param <WM>  the type of window output {@link Message}
-   */
-  public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> {
-    /**
-     * The output {@link MessageStream}
-     */
-    private final MessageStream<WM> outputStream;
-
-    /**
-     * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window
-     * state(s) from the window state store, and generate output {@link Message}s to the output stream.
-     */
-    private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction;
-
-    /**
-     * The state store functions for the {@link WindowOperator}
-     */
-    private final StoreFunctions<M, WK, WS> storeFunctions;
-
-    /**
-     * The window trigger function
-     */
-    private final Trigger<M, WS> trigger;
-
-    /**
-     * The unique ID of stateful operators
-     */
-    private final String opId;
-
-    /**
-     * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}.
-     *
-     * @param windowFn  description of the window function
-     * @param operatorId  auto-generated unique ID of the operator
-     */
-    private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
-      this.outputStream = new MessageStream<>();
-      this.txfmFunction = windowFn.getTransformFunc();
-      this.storeFunctions = windowFn.getStoreFuncs();
-      this.trigger = windowFn.getTrigger();
-      this.opId = operatorId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<WM> getOutputStream() {
-      return this.outputStream;
-    }
-
-    /**
-     * Method to get the window's {@link StoreFunctions}.
-     *
-     * @return  the window operator's {@code storeFunctions}
-     */
-    public StoreFunctions<M, WK, WS> getStoreFunctions() {
-      return this.storeFunctions;
-    }
-
-    /**
-     * Method to get the window operator's main function
-     *
-     * @return   the window operator's {@code txfmFunction}
-     */
-    public BiFunction<M, Entry<WK, WS>, WM> getFunction() {
-      return this.txfmFunction;
-    }
-
-    /**
-     * Method to get the trigger functions
-     *
-     * @return  the {@link Trigger} for this {@link WindowOperator}
-     */
-    public Trigger<M, WS> getTrigger() {
-      return this.trigger;
-    }
-
-    /**
-     * Method to generate the window operator's state store name
-     *
-     * @param inputStream the input {@link MessageStream} to this state store
-     * @return   the persistent store name of the window operator
-     */
-    public String getStoreName(MessageStream<M> inputStream) {
-      //TODO: need to get the persistent name of ds and the operator in a serialized form
-      return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString());
-    }
-  }
-
-  /**
-   * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from
-   * another stream and generate join output to {@code output}
-   *
-   * @param <M>  the type of input {@link Message}
-   * @param <K>  the type of join key
-   * @param <JM>  the type of message of {@link Message} in the other join stream
-   * @param <RM>  the type of message of {@link Message} in the join output stream
-   */
-  public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> {
-
-    private final MessageStream<RM> joinOutput;
-
-    /**
-     * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message,
-     * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}.
-     */
-    private final BiFunction<M, JM, RM> txfmFunction;
-
-    /**
-     * The message store functions that read the buffered messages from the other stream in the join
-     */
-    private final StoreFunctions<JM, K, JM> joinStoreFunctions;
-
-    /**
-     * The message store functions that save the buffered messages of this {@link MessageStream} in the join
-     */
-    private final StoreFunctions<M, K, M> selfStoreFunctions;
-
-    /**
-     * The unique ID for the stateful operator
-     */
-    private final String opId;
-
-    /**
-     * Default constructor to create a {@link PartialJoinOperator} object
-     *
-     * @param partialJoin  partial join function that take type {@code M} of input {@link Message} and join w/ type
-     *                     {@code JM} of buffered {@link Message} from another stream
-     * @param joinOutput  the output {@link MessageStream} of the join results
-     */
-    private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) {
-      this.joinOutput = joinOutput;
-      this.txfmFunction = partialJoin;
-      // Read-only join store, no creator/updater functions specified
-      this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null);
-      // Buffered message store for this input stream
-      this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
-      this.opId = opId;
-    }
-
-    @Override
-    public String toString() {
-      return this.opId;
-    }
-
-    @Override
-    public MessageStream<RM> getOutputStream() {
-      return this.joinOutput;
-    }
-
-    /**
-     * Method to get {@code joinStoreFunctions}
-     *
-     * @return  {@code joinStoreFunctions}
-     */
-    public StoreFunctions<JM, K, JM> getJoinStoreFunctions() {
-      return this.joinStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code selfStoreFunctions}
-     *
-     * @return  {@code selfStoreFunctions}
-     */
-    public StoreFunctions<M, K, M> getSelfStoreFunctions() {
-      return this.selfStoreFunctions;
-    }
-
-    /**
-     * Method to get {@code txfmFunction}
-     *
-     * @return  {@code txfmFunction}
-     */
-    public BiFunction<M, JM, RM> getFunction() {
-      return this.txfmFunction;
-    }
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator}
-   *
-   * @param transformFn  the corresponding transformation function
-   * @param <M>  type of input {@link Message}
-   * @param <OM>  type of output {@link Message}
-   * @return  the {@link StreamOperator}
-   */
-  public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) {
-    return new StreamOperator<>(transformFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator}
-   *
-   * @param sinkFn  the sink function
-   * @param <M>  type of input {@link Message}
-   * @return   the {@link SinkOperator}
-   */
-  public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) {
-    return new SinkOperator<>(sinkFn);
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
-   *
-   * @param windowFn  the {@link WindowFn} function
-   * @param <M>  type of input {@link Message}
-   * @param <WK>  type of window key
-   * @param <WS>  type of {@link WindowState}
-   * @param <WM>  type of output {@link WindowOutput}
-   * @return  the {@link WindowOperator}
-   */
-  public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator(
-      WindowFn<M, WK, WS, WM> windowFn) {
-    return new WindowOperator<>(windowFn, Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator}
-   *
-   * @param joiner  the {@link WindowFn} function
-   * @param joinOutput  the output {@link MessageStream}
-   * @param <M>  type of input {@link Message}
-   * @param <K>  type of join key
-   * @param <JM>  the type of message in the {@link Message} from the other join stream
-   * @param <RM>  the type of message in the {@link Message} from the join function
-   * @return  the {@link PartialJoinOperator}
-   */
-  public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator(
-      BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) {
-    return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId());
-  }
-
-  /**
-   * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function
-   *
-   * @param mergeOutput  the common output {@link MessageStream} from the merger
-   * @param <M>  the type of input {@link Message}
-   * @return  the {@link StreamOperator} for merge
-   */
-  public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) {
-    return new StreamOperator<M, M>(t ->
-      new ArrayList<M>() { {
-          this.add(t);
-        } },
-      mergeOutput);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
deleted file mode 100644
index 3b50e2b..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable.
- *
- * @param <M>  the type of message from the input stream
- * @param <S>  the type of state variable in the window's state store
- */
-public class Trigger<M extends Message, S extends WindowState> {
-
-  /**
-   * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward
-   */
-  private final Function<S, Boolean> timerTrigger;
-
-  /**
-   * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> earlyTrigger;
-
-  /**
-   * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator}
-   */
-  private final BiFunction<M, S, Boolean> lateTrigger;
-
-  /**
-   * the function to updated the window state when the first output is triggered
-   */
-  private final Function<S, S> earlyTriggerUpdater;
-
-  /**
-   * the function to updated the window state when the late output is triggered
-   */
-  private final Function<S, S> lateTriggerUpdater;
-
-  /**
-   * Private constructor to prevent instantiation
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger   late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater   late trigger state updater
-   */
-  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
-      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
-    this.timerTrigger = timerTrigger;
-    this.earlyTrigger = earlyTrigger;
-    this.lateTrigger = lateTrigger;
-    this.earlyTriggerUpdater = earlyTriggerUpdater;
-    this.lateTriggerUpdater = lateTriggerUpdater;
-  }
-
-  /**
-   * Static method to create a {@link Trigger} object
-   *
-   * @param timerTrigger  system timer trigger condition
-   * @param earlyTrigger  early trigger condition
-   * @param lateTrigger  late trigger condition
-   * @param earlyTriggerUpdater  early trigger state updater
-   * @param lateTriggerUpdater  late trigger state updater
-   * @param <M>  the type of input {@link Message}
-   * @param <S>  the type of window state extends {@link WindowState}
-   * @return  the {@link Trigger} function
-   */
-  public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger,
-      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater,
-      Function<S, S> lateTriggerUpdater) {
-    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
deleted file mode 100644
index 489e5b8..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.storage.kv.Entry;
-
-import java.util.function.BiFunction;
-
-
-/**
- * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used
- * by the internal representation and implementation classes in operators.
- *
- * @param <M> type of input stream {@link Message} for window
- * @param <WK>  type of window key in the output {@link Message}
- * @param <WS>  type of {@link WindowState} variable in the state store
- * @param <WM>  type of the message in the output stream
- */
-public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> {
-
-  /**
-   * get the transformation function of the {@link WindowFn}
-   *
-   * @return  the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput}
-   */
-  BiFunction<M, Entry<WK, WS>, WM> getTransformFunc();
-
-  /**
-   * get the state store functions for this {@link WindowFn}
-   *
-   * @return  the collection of state store methods
-   */
-  Operators.StoreFunctions<M, WK, WS> getStoreFuncs();
-
-  /**
-   * get the trigger conditions for this {@link WindowFn}
-   *
-   * @return  the trigger condition for the {@link WindowFn} function
-   */
-  Trigger<M, WS> getTrigger();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
deleted file mode 100644
index 643b703..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.data.Message;
-
-
-/**
- * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function
- *
- * @param <K>  the type of key in the output window result
- * @param <M>  the type of value in the output window result
- */
-public final class WindowOutput<K, M> implements Message<K, M> {
-  private final K key;
-  private final M value;
-
-  WindowOutput(K key, M aggregated) {
-    this.key = key;
-    this.value = aggregated;
-  }
-
-  @Override public M getMessage() {
-    return this.value;
-  }
-
-  @Override public K getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return 0;
-  }
-
-  static public <K, M> WindowOutput<K, M> of(K key, M result) {
-    return new WindowOutput<>(key, result);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
deleted file mode 100644
index 42c8f74..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.task;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-
-import java.util.Collection;
-
-/**
- * This interface defines the methods that user needs to implement via the operator programming APIs.
- */
-@InterfaceStability.Unstable
-public interface StreamOperatorTask {
-
-  /**
-   * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s.
-   * Users have to implement this function to define their transformation logic on each of the incoming
-   * {@link SystemMessageStream}.
-   *
-   * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition}
-   *
-   * @param sources  the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage}
-   *                 from a {@link org.apache.samza.system.SystemStreamPartition}
-   */
-  void initOperators(Collection<SystemMessageStream> sources);
-
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
new file mode 100644
index 0000000..287025c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * This class defines a session window function class
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <WK>  the type of session key in the session window
+ * @param <WV>  the type of output value in each session window
+ */
+public class SessionWindow<M extends MessageEnvelope, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> {
+
+  /**
+   * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows}
+   *
+   * @param sessionKeyFunction  function to get the session key from the input {@link MessageEnvelope}
+   * @param aggregator  function to calculate the output value based on the input {@link MessageEnvelope} and current output value
+   */
+  SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) {
+    this.wndKeyFunction = sessionKeyFunction;
+    this.aggregator = aggregator;
+  }
+
+  /**
+   * function to calculate the window key from input {@link MessageEnvelope}
+   */
+  private final Function<M, WK> wndKeyFunction;
+
+  /**
+   * function to calculate the output value from the input {@link MessageEnvelope} and the current output value
+   */
+  private final BiFunction<M, WV, WV> aggregator;
+
+  /**
+   * trigger condition that determines when to send the {@link WindowOutput}
+   */
+  private Trigger<M, WindowState<WV>> trigger = null;
+
+  //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link MessageEnvelope} type for {@link Window}
+  private StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null;
+
+  /**
+   * Public API methods start here
+   */
+
+  /**
+   * Public API method to define the watermark trigger for the window operator
+   *
+   * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow}
+   * @return The window operator w/ the defined watermark trigger
+   */
+  @Override
+  public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) {
+    this.trigger = wndTrigger.build();
+    return this;
+  }
+
+  private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() {
+    // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers;
+    return null;
+  }
+
+  public WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() {
+    return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() {
+
+      @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFn() {
+        return SessionWindow.this.getTransformFunc();
+      }
+
+      @Override public StoreFunctions<M, WK, WindowState<WV>> getStoreFns() {
+        return SessionWindow.this.storeFunctions;
+      }
+
+      @Override public Trigger<M, WindowState<WV>> getTrigger() {
+        return SessionWindow.this.trigger;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
new file mode 100644
index 0000000..0d40761
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * The store functions that are used by window and partial join operators to store and retrieve buffered {@link MessageEnvelope}s
+ * and partial aggregation results.
+ *
+ * @param <SK>  the type of key used to store the operator state
+ * @param <SS>  the type of operator state. E.g. could be the partial aggregation result for a window, or a buffered
+ *             input {@link MessageEnvelope} from the join stream for a join
+ */
+public class StoreFunctions<M extends MessageEnvelope, SK, SS> {
+  /**
+   * Function that returns the key to query in the operator state store for a particular {@link MessageEnvelope}.
+   * This 1:1 function only returns a single key for the incoming {@link MessageEnvelope}. This is sufficient to support
+   * non-overlapping windows and unique-key based joins.
+   *
+   * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join,
+   * the query to the state store is usually a range scan. We need to add a rangeKeyFinder function
+   * (or make this function return a collection) to map from a single input {@link MessageEnvelope} to a range of keys in the store.
+   */
+  private final Function<M, SK> storeKeyFn;
+
+  /**
+   * Function to update the store entry based on the current operator state and the incoming {@link MessageEnvelope}.
+   *
+   * TODO: this is assuming a 1:1 mapping from the input {@link MessageEnvelope} to the store entry. When implementing sliding/hopping
+   * windows and non-unique-key-based join, we may need to include the corresponding state key in addition to the
+   * state value. Alternatively this can be called once for each store key for the {@link MessageEnvelope}.
+   */
+  private final BiFunction<M, SS, SS> stateUpdaterFn;
+
+  public StoreFunctions(Function<M, SK> storeKeyFn, BiFunction<M, SS, SS> stateUpdaterFn) {
+    this.storeKeyFn = storeKeyFn;
+    this.stateUpdaterFn = stateUpdaterFn;
+  }
+
+  public Function<M, SK> getStoreKeyFn() {
+    return this.storeKeyFn;
+  }
+
+  public BiFunction<M, SS, SS> getStateUpdaterFn() {
+    return this.stateUpdaterFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
new file mode 100644
index 0000000..c8b0edb
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Defines the trigger functions for the window operator. This class is immutable.
+ *
+ * @param <M>  the type of {@link MessageEnvelope} in the input stream
+ * @param <S>  the type of state variable in the window's state store
+ */
+public class Trigger<M extends MessageEnvelope, S extends WindowState> {
+
+  /**
+   * System timer based trigger condition. This is the only guarantee that the window operator will proceed forward
+   */
+  private final Function<S, Boolean> timerTrigger;
+
+  /**
+   * early trigger condition that determines when to send the first output from the window operator
+   */
+  private final BiFunction<M, S, Boolean> earlyTrigger;
+
+  /**
+   * late trigger condition that determines when to send the updated output after the first one from a window operator
+   */
+  private final BiFunction<M, S, Boolean> lateTrigger;
+
+  /**
+   * the function to updated the window state when the first output is triggered
+   */
+  private final Function<S, S> earlyTriggerUpdater;
+
+  /**
+   * the function to updated the window state when the late output is triggered
+   */
+  private final Function<S, S> lateTriggerUpdater;
+
+  /**
+   * Private constructor to prevent instantiation
+   *
+   * @param timerTrigger  system timer trigger condition
+   * @param earlyTrigger  early trigger condition
+   * @param lateTrigger   late trigger condition
+   * @param earlyTriggerUpdater  early trigger state updater
+   * @param lateTriggerUpdater   late trigger state updater
+   */
+  private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger,
+      Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) {
+    this.timerTrigger = timerTrigger;
+    this.earlyTrigger = earlyTrigger;
+    this.lateTrigger = lateTrigger;
+    this.earlyTriggerUpdater = earlyTriggerUpdater;
+    this.lateTriggerUpdater = lateTriggerUpdater;
+  }
+
+  /**
+   * Static method to create a {@link Trigger} object
+   *
+   * @param timerTrigger  system timer trigger condition
+   * @param earlyTrigger  early trigger condition
+   * @param lateTrigger  late trigger condition
+   * @param earlyTriggerUpdater  early trigger state updater
+   * @param lateTriggerUpdater  late trigger state updater
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <S>  the type of window state extends {@link WindowState}
+   * @return  the {@link Trigger} function
+   */
+  public static <M extends MessageEnvelope, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger,
+      BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater,
+      Function<S, S> lateTriggerUpdater) {
+    return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
new file mode 100644
index 0000000..6336a50
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a builder of {@link Trigger} object for a {@link Window}. The triggers are categorized into
+ * three types:
+ *
+ * <p>
+ *   early trigger: defines the condition when the first output from the window function is sent.
+ *   late trigger: defines the condition when the updated output after the first output is sent.
+ *   timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers
+ * </p>
+ *
+ * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction
+ * of each individual trigger (i.e. OR).
+ *
+ * @param <M>  the type of input {@link MessageEnvelope} to the {@link Window}
+ * @param <V>  the type of output value from the {@link Window}
+ */
+@InterfaceStability.Unstable
+public final class TriggerBuilder<M extends MessageEnvelope, V> {
+
+  /**
+   * Predicate helper to OR multiple trigger conditions
+   */
+  static class PredicateHelper {
+    static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) {
+      return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
+    }
+
+    static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) {
+      return s -> lhs.apply(s) || rhs.apply(s);
+    }
+  }
+
+  /**
+   * The early trigger condition that determines the first output from the {@link Window}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
+
+  /**
+   * The late trigger condition that determines the late output(s) from the {@link Window}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
+
+  /**
+   * The system timer based trigger conditions that guarantees the {@link Window} proceeds forward
+   */
+  private Function<WindowState<V>, Boolean> timerTrigger = null;
+
+  /**
+   * The state updater function to be applied after the first output is triggered
+   */
+  private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity();
+
+  /**
+   * The state updater function to be applied after the late output is triggered
+   */
+  private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity();
+
+  /**
+   * Helper method to add a trigger condition
+   *
+   * @param currentTrigger  current trigger condition
+   * @param newTrigger  new trigger condition
+   * @return  combined trigger condition that is {@code currentTrigger} OR {@code newTrigger}
+   */
+  private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger,
+      BiFunction<M, WindowState<V>, Boolean> newTrigger) {
+    if (currentTrigger == null) {
+      return newTrigger;
+    }
+
+    return PredicateHelper.or(currentTrigger, newTrigger);
+  }
+
+  /**
+   * Helper method to add a system timer trigger
+   *
+   * @param currentTimer  current timer condition
+   * @param newTimer  new timer condition
+   * @return  combined timer condition that is {@code currentTimer} OR {@code newTimer}
+   */
+  private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
+      Function<WindowState<V>, Boolean> newTimer) {
+    if (currentTimer == null) {
+      return newTimer;
+    }
+
+    return PredicateHelper.or(currentTimer, newTimer);
+  }
+
+  /**
+   * default constructor to prevent instantiation
+   */
+  private TriggerBuilder() {}
+
+  /**
+   * Constructor that set the size limit as the early trigger for a window
+   *
+   * @param sizeLimit  the number of {@link MessageEnvelope}s in a window that would trigger the first output
+   */
+  private TriggerBuilder(long sizeLimit) {
+    this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
+  }
+
+  /**
+   * Constructor that set the event time length as the early trigger
+   *
+   * @param eventTimeFunction  the function that calculate the event time in nano-second from the input {@link MessageEnvelope}
+   * @param wndLenMs  the window length in event time in milli-second
+   */
+  private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
+    this.earlyTrigger = (m, s) ->
+        TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(),
+            eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs;
+  }
+
+  /**
+   * Constructor that set the special token {@link MessageEnvelope} as the early trigger
+   *
+   * @param tokenFunc  the function that checks whether an input {@link MessageEnvelope} is a token {@link MessageEnvelope} that triggers window output
+   */
+  private TriggerBuilder(Function<M, Boolean> tokenFunc) {
+    this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
+  }
+
+  /**
+   * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder}
+   * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
+   *
+   * @return  the final {@link Trigger} object
+   */
+  Trigger<M, WindowState<V>> build() {
+    return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
+  }
+
+  /**
+   * Public API methods start here
+   */
+
+
+  /**
+   * API method to allow users to set an update method to update the output value after the first window output is triggered
+   * by the early trigger condition
+   *
+   * @param onTriggerFunc  the method to update the output value after the early trigger
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
+    this.earlyTriggerUpdater = s -> {
+      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
+      return s;
+    };
+    return this;
+  }
+
+  /**
+   * API method to allow users to set an update method to update the output value after a late window output is triggered
+   * by the late trigger condition
+   *
+   * @param onTriggerFunc  the method to update the output value after the late trigger
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
+    this.lateTriggerUpdater = s -> {
+      s.setOutputValue(onTriggerFunc.apply(s.getOutputValue()));
+      return s;
+    };
+    return this;
+  }
+
+  /**
+   * API method to allow users to add a system timer trigger based on timeout after the last {@link MessageEnvelope} received in the window
+   *
+   * @param timeoutMs  the timeout in ms after the last {@link MessageEnvelope} received in the window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
+    this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
+        s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+    return this;
+  }
+
+  /**
+   * API method to allow users to add a system timer trigger based on the timeout after the first {@link MessageEnvelope} received in the window
+   *
+   * @param timeoutMs  the timeout in ms after the first {@link MessageEnvelope} received in the window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
+    this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
+        TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
+    return this;
+  }
+
+  /**
+   * API method allow users to add a late trigger based on the window size limit
+   *
+   * @param sizeLimit  limit on the number of {@link MessageEnvelope}s in window
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
+    this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit);
+    return this;
+  }
+
+  /**
+   * API method to allow users to define a customized late trigger function based on input {@link MessageEnvelope} and the window state
+   *
+   * @param lateTrigger  the late trigger condition based on input {@link MessageEnvelope} and the current {@link WindowState}
+   * @return  the {@link TriggerBuilder} object
+   */
+  public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) {
+    this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
+    return this;
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit
+   *
+   * @param sizeLimit  window size limit
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <V>  the type of {@link Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) {
+    return new TriggerBuilder<M, V>(sizeLimit);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window
+   *
+   *
+   * @param eventTimeFunc  the function to get the event time from the input {@link MessageEnvelope}
+   * @param eventTimeWndSizeMs  the event time window size in Ms
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <V>  the type of {@link Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) {
+    return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token {@link MessageEnvelope}s
+   *
+   * @param tokenFunc  the function to determine whether an input {@link MessageEnvelope} is a window token or not
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <V>  the type of {@link Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
+    return new TriggerBuilder<M, V>(tokenFunc);
+  }
+
+  /**
+   * Static API method to allow customized early trigger condition based on input {@link MessageEnvelope} and the corresponding {@link WindowState}
+   *
+   * @param earlyTrigger  the user defined early trigger condition
+   * @param <M>   the input {@link MessageEnvelope} type
+   * @param <V>   the output value from the window
+   * @return   the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
+    TriggerBuilder<M, V> newTriggers =  new TriggerBuilder<M, V>();
+    newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
+    return newTriggers;
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last {@link MessageEnvelope} received in the window
+   *
+   * @param timeoutMs  timeout in ms after the last {@link MessageEnvelope} received
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <V>  the type of {@link Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) {
+    return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
+  }
+
+  /**
+   * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first {@link MessageEnvelope} received in the window
+   *
+   * @param timeoutMs  timeout in ms after the first {@link MessageEnvelope} received
+   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <V>  the type of {@link Window} output value
+   * @return  the {@link TriggerBuilder} object
+   */
+  public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) {
+    return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
new file mode 100644
index 0000000..56a307d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+/**
+ * The public programming interface class for window function
+ *
+ * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <WK>  the type of key to the {@link Window}
+ * @param <WV>  the type of output value in the {@link WindowOutput}
+ * @param <WM>  the type of {@link MessageEnvelope} in the window output stream
+ */
+public interface Window<M extends MessageEnvelope, WK, WV, WM extends WindowOutput<WK, WV>> {
+
+  /**
+   * Set the triggers for this {@link Window}
+   *
+   * @param wndTrigger  trigger conditions set by the programmers
+   * @return  the {@link Window} function w/ the trigger {@code wndTrigger}
+   */
+  Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger);
+
+  /**
+   * Internal implementation helper to get the functions associated with this Window.
+   *
+   * <b>NOTE:</b> This is purely an internal API and should not be used directly by users.
+   *
+   * @return the functions associated with this Window.
+   */
+  WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
new file mode 100644
index 0000000..8878bf9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.storage.kv.Entry;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Defines an internal representation of a window function.
+ *
+ * @param <M>  type of the input {@link MessageEnvelope} for the window
+ * @param <WK>  type of the window key in the output {@link MessageEnvelope}
+ * @param <WS>  type of the {@link WindowState} in the state store
+ * @param <WM>  type of the {@link MessageEnvelope} in the output stream
+ */
+public interface WindowFn<M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> {
+
+  /**
+   * Get the transformation function of the {@link WindowFn}.
+   *
+   * @return  the transformation function which takes a {@link MessageEnvelope} of type {@code M} and its window state entry,
+   *          and transforms it to an {@link WindowOutput}
+   */
+  BiFunction<M, Entry<WK, WS>, WM> getTransformFn();
+
+  /**
+   * Get the state store functions for this {@link WindowFn}.
+   *
+   * @return  the state store functions
+   */
+  StoreFunctions<M, WK, WS> getStoreFns();
+
+  /**
+   * Get the trigger conditions for this {@link WindowFn}.
+   *
+   * @return  the trigger condition for this {@link WindowFn}
+   */
+  Trigger<M, WS> getTrigger();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
new file mode 100644
index 0000000..63e34c8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+/**
+ * The type of output {@link MessageEnvelope}s in a window operator output stream.
+ *
+ * @param <K>  the type of key in the window output
+ * @param <M>  the type of value in the window output
+ */
+public final class WindowOutput<K, M> implements MessageEnvelope<K, M> {
+  private final K key;
+  private final M value;
+
+  WindowOutput(K key, M value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  @Override public M getMessage() {
+    return this.value;
+  }
+
+  @Override public K getKey() {
+    return this.key;
+  }
+
+  static public <K, M> WindowOutput<K, M> of(K key, M result) {
+    return new WindowOutput<>(key, result);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
new file mode 100644
index 0000000..835d749
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
+ * customized window state to be stored in window state stores by implementing this interface class.
+ *
+ * @param <WV>  the type for window output value
+ */
+@InterfaceStability.Unstable
+public interface WindowState<WV> {
+  /**
+   * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope}
+   * in the window is received
+   *
+   * @return  nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope}
+   *          received in the window
+   */
+  long getFirstMessageTimeNs();
+
+  /**
+   * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope}
+   * in the window is received
+   *
+   * @return  nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope}
+   *          received in the window
+   */
+  long getLastMessageTimeNs();
+
+  /**
+   * Method to get the earliest event time in the window
+   *
+   * @return  the earliest event time in nano-second in the window
+   */
+  long getEarliestEventTimeNs();
+
+  /**
+   * Method to get the latest event time in the window
+   *
+   * @return  the latest event time in nano-second in the window
+   */
+  long getLatestEventTimeNs();
+
+  /**
+   * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window
+   *
+   * @return  number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window
+   */
+  long getNumberMessages();
+
+  /**
+   * Method to get the corresponding window's output value
+   *
+   * @return  the corresponding window's output value
+   */
+  WV getOutputValue();
+
+  /**
+   * Method to set the corresponding window's output value
+   *
+   * @param value  the corresponding window's output value
+   */
+  void setOutputValue(WV value);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
new file mode 100644
index 0000000..1a4ed8f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+
+/**
+ * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be
+ * used by the user (i.e. programmers) to create {@link Window} function directly.
+ *
+ */
+public final class Windows {
+
+  /**
+   * private constructor to prevent instantiation
+   */
+  private Windows() {}
+
+  static <M extends MessageEnvelope, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
+      Window<M, WK, WV, WM> window) {
+    if (window instanceof SessionWindow) {
+      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window;
+      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
+    }
+    throw new IllegalArgumentException("Input window type not supported.");
+  }
+
+  /**
+   * Public static API methods start here
+   *
+   */
+
+  /**
+   * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input {@link MessageEnvelope}s
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param <M>  type of input {@link MessageEnvelope}
+   * @param <WK>  type of the session window key
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends MessageEnvelope, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
+        c.add(m);
+        return c;
+      }
+    );
+  }
+
+  /**
+   * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input {@link MessageEnvelope}s
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param sessionInfoExtractor  function to retrieve session info of type {@code SI} from the input {@link MessageEnvelope} of type {@code M}
+   * @param <M>  type of the input {@link MessageEnvelope}
+   * @param <WK>  type of the session window key
+   * @param <SI>  type of the session information retrieved from each input {@link MessageEnvelope} of type {@code M}
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends MessageEnvelope, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction,
+      Function<M, SI> sessionInfoExtractor) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
+        c.add(sessionInfoExtractor.apply(m));
+        return c;
+      }
+    );
+  }
+
+  /**
+   * Static API method to create a {@link SessionWindow} as a counter of input {@link MessageEnvelope}s
+   *
+   * @param sessionKeyFunction  function to calculate session window key
+   * @param <M>  type of the input {@link MessageEnvelope}
+   * @param <WK>  type of the session window key
+   * @return  the {@link Window} function for the session
+   */
+  public static <M extends MessageEnvelope, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) {
+    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
deleted file mode 100644
index 8c56287..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.Message;
-
-
-public class TestMessage implements Message<String, String> {
-
-  private final String key;
-  private final String value;
-  private final long timestamp;
-
-  TestMessage(String key, String value, long timestamp) {
-    this.key = key;
-    this.value = value;
-    this.timestamp = timestamp;
-  }
-
-  @Override public String getMessage() {
-    return this.value;
-  }
-
-  @Override public String getKey() {
-    return this.key;
-  }
-
-  @Override public long getTimestamp() {
-    return this.timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
new file mode 100644
index 0000000..dfa69ac
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+public class TestMessageEnvelope implements MessageEnvelope<String, TestMessageEnvelope.MessageType> {
+
+  private final String key;
+  private final MessageType value;
+
+  public TestMessageEnvelope(String key, String value, long eventTime) {
+    this.key = key;
+    this.value = new MessageType(value, eventTime);
+  }
+
+  @Override
+  public MessageType getMessage() {
+    return this.value;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+
+  public class MessageType {
+    private final String value;
+    private final long eventTime;
+
+    public MessageType(String value, long eventTime) {
+      this.value = value;
+      this.eventTime = eventTime;
+    }
+
+    public long getEventTime() {
+      return eventTime;
+    }
+
+    public String getValue() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
deleted file mode 100644
index 4dbe233..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.internal.Operators.*;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestMessageStream {
-
-  @Test public void testMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
-    MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperator);
-    assertEquals(mapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessage xTestMsg = mock(TestMessage.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn("123456789");
-    when(xTestMsg.getTimestamp()).thenReturn(12345L);
-    Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessage outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
-    assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
-  }
-
-  @Test public void testFlatMap() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { {
-        this.add(mock(TestOutputMessage.class));
-        this.add(mock(TestOutputMessage.class));
-        this.add(mock(TestOutputMessage.class));
-      } };
-    Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
-    MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperator);
-    assertEquals(flatMapOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
-  }
-
-  @Test public void testFilter() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
-    MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperator);
-    assertEquals(filterOp.getOutputStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
-    TestMessage mockMsg = mock(TestMessage.class);
-    when(mockMsg.getTimestamp()).thenReturn(11111L);
-    Collection<TestMessage> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getTimestamp()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
-  }
-
-  @Test public void testSink() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
-      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperator);
-    assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
-    assertNull(((SinkOperator) sinkOp).getOutputStream());
-  }
-
-  @Test public void testWindow() {
-    MessageStream<TestMessage> inputStream = new MessageStream<>();
-    Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
-    MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
-    Collection<Operator> subs = inputStream.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> wndOp = subs.iterator().next();
-    assertTrue(wndOp instanceof WindowOperator);
-    assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
-  }
-
-  @Test public void testJoin() {
-    MessageStream<TestMessage> source1 = new MessageStream<>();
-    MessageStream<TestMessage> source2 = new MessageStream<>();
-    BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
-    MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
-    Collection<Operator> subs = source1.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
-    subs = source2.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperator);
-    assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
-    TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
-    TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
-    TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-    xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    assertEquals(xOut.getTimestamp(), 11111L);
-  }
-
-  @Test public void testMerge() {
-    MessageStream<TestMessage> merge1 = new MessageStream<>();
-    Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { {
-        this.add(new MessageStream<>());
-        this.add(new MessageStream<>());
-      } };
-    MessageStream<TestMessage> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
-
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
-  }
-
-  private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
-    Collection<Operator> subs = mergeSource.getSubscribers();
-    assertEquals(subs.size(), 1);
-    Operator<TestMessage> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperator);
-    assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
deleted file mode 100644
index c5fcceb..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestMessageStreams {
-
-  @Test public void testInput() {
-    SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0));
-    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
-    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
-  }
-}