You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/10/04 23:23:04 UTC

[4/5] samza git commit: SAMZA-914: Creating the fluent programming APIs w/ operators

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
new file mode 100644
index 0000000..e202c20
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api.internal;
+
+import org.apache.samza.operators.api.data.Message;
+
+
+/**
+ * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function
+ *
+ * @param <K>  the type of key in the output window result
+ * @param <M>  the type of value in the output window result
+ */
+public final class WindowOutput<K, M> implements Message<K, M> {
+  private final K key;
+  private final M value;
+
+  WindowOutput(K key, M aggregated) {
+    this.key = key;
+    this.value = aggregated;
+  }
+
+  @Override public M getMessage() {
+    return this.value;
+  }
+
+  @Override public K getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return 0;
+  }
+
+  static public <K, M> WindowOutput<K, M> of(K key, M result) {
+    return new WindowOutput<>(key, result);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
new file mode 100644
index 0000000..49cfdeb
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation class for a chain of operators from the single input {@code source}
+ *
+ * @param <M>  type of message in the input stream {@code source}
+ */
+public class ChainedOperators<M extends Message> {
+
+  /**
+   * Private constructor
+   *
+   * @param source  the input source {@link MessageStream}
+   * @param context  the {@link TaskContext} object that we need to instantiate the state stores
+   */
+  private ChainedOperators(MessageStream<M> source, TaskContext context) {
+    // create the pipeline/topology starting from source
+    // pass in the context s.t. stateful stream operators can initialize their stores
+  }
+
+  /**
+   * Static method to create a {@link ChainedOperators} from the {@code source} stream
+   *
+   * @param source  the input source {@link MessageStream}
+   * @param context  the {@link TaskContext} object used to initialize the {@link StateStoreImpl}
+   * @param <M>  the type of input {@link Message}
+   * @return a {@link ChainedOperators} object takes the {@code source} as input
+   */
+  public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) {
+    return new ChainedOperators<>(source, context);
+  }
+
+  /**
+   * Method to navigate the incoming {@code message} through the processing chains
+   *
+   * @param message  the incoming message to this {@link ChainedOperators}
+   * @param collector  the {@link MessageCollector} object within the process context
+   * @param coordinator  the {@link TaskCoordinator} object within the process context
+   */
+  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    // TODO: add implementation of onNext() that actually triggers the process pipeline
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    // TODO: add implementation of onTimer() that actually calls the corresponding window operator's onTimer() methods
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
new file mode 100644
index 0000000..81a7ede
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.reactivestreams.Processor;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Abstract base class for all stream operator implementation classes.
+ */
+public abstract class OperatorImpl<M extends Message, RM extends Message>
+    implements Processor<ProcessorContext<M>, ProcessorContext<RM>> {
+
+  private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = new HashSet<>();
+
+  @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) {
+    // Only add once
+    subscribers.add(s);
+  }
+
+  @Override public void onSubscribe(Subscription s) {
+
+  }
+
+  @Override public void onNext(ProcessorContext<M> o) {
+
+    onNext(o.getMessage(), o.getCollector(), o.getCoordinator());
+  }
+
+  @Override public void onError(Throwable t) {
+
+  }
+
+  @Override public void onComplete() {
+
+  }
+
+  /**
+   * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers.
+   *
+   * @param message  the input {@link Message}
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  protected abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
+
+  /**
+   * Stateful operators will need to override this method to initialize the operators
+   *
+   * @param context  the task context to initialize the operators within
+   */
+  protected void init(TaskContext context) {};
+
+  /**
+   * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream}
+   * from this operator
+   *
+   * @param omsg  output {@link Message}
+   * @param collector  the {@link MessageCollector} in the context
+   * @param coordinator  the {@link TaskCoordinator} in the context
+   */
+  protected void nextProcessors(RM omsg, MessageCollector collector, TaskCoordinator coordinator) {
+    subscribers.forEach(sub ->
+      sub.onNext(new ProcessorContext<>(omsg, collector, coordinator))
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
new file mode 100644
index 0000000..5a375bc
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Wrapper class to be used by {@link OperatorImpl}
+ *
+ * @param <M>  Type of input stream {@link Message}
+ */
+public class ProcessorContext<M extends Message> {
+  private final M message;
+  private final MessageCollector collector;
+  private final TaskCoordinator coordinator;
+
+  ProcessorContext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    this.message = message;
+    this.collector = collector;
+    this.coordinator = coordinator;
+  }
+
+  M getMessage() {
+    return this.message;
+  }
+
+  MessageCollector getCollector() {
+    return this.collector;
+  }
+
+  TaskCoordinator getCoordinator() {
+    return this.coordinator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
new file mode 100644
index 0000000..b29d9c8
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.function.Function;
+
+
+/**
+ * Base class for all implementation of operators
+ *
+ * @param <M>  type of message in the input stream
+ * @param <RM>  type of message in the output stream
+ */
+public class SimpleOperatorImpl<M extends Message, RM extends Message> extends OperatorImpl<M, RM> {
+
+  private final Function<M, Collection<RM>> transformFn;
+
+  SimpleOperatorImpl(StreamOperator<M, RM> op) {
+    super();
+    this.transformFn = op.getFunction();
+  }
+
+  @Override protected void onNext(M imsg, MessageCollector collector, TaskCoordinator coordinator) {
+    // actually calling the transform function and then for each output, call nextProcessors()
+    this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, collector, coordinator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
new file mode 100644
index 0000000..5d25cfa
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation for {@link SinkOperator}
+ */
+public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, Message> {
+  private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFunc;
+
+  SinkOperatorImpl(SinkOperator<M> sinkOp) {
+    this.sinkFunc = sinkOp.getFunction();
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    this.sinkFunc.apply(message, collector, coordinator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
new file mode 100644
index 0000000..f573fd0
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The base class for all state stores
+ */
+public class StateStoreImpl<M extends Message, SK, SS> {
+  private final String storeName;
+  private final StoreFunctions<M, SK, SS> storeFunctions;
+  private KeyValueStore<SK, SS> kvStore = null;
+
+  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
+    this.storeFunctions = store;
+    this.storeName = storeName;
+  }
+
+  public void init(TaskContext context) {
+    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
+  }
+
+  public Entry<SK, SS> getState(M m) {
+    SK key = this.storeFunctions.getStoreKeyFinder().apply(m);
+    SS state = this.kvStore.get(key);
+    return new Entry<>(key, state);
+  }
+
+  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
+    SS newValue = this.storeFunctions.getStateUpdater().apply(m, oldEntry.getValue());
+    this.kvStore.put(oldEntry.getKey(), newValue);
+    return new Entry<>(oldEntry.getKey(), newValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
new file mode 100644
index 0000000..e4f5d79
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.operators.api.data.Data;
+import org.apache.samza.operators.api.data.Schema;
+
+
+public class AvroData implements Data {
+  protected final Object datum;
+  protected final AvroSchema schema;
+
+  private AvroData(AvroSchema schema, Object datum) {
+    this.datum = datum;
+    this.schema = schema;
+  }
+
+  @Override
+  public Schema schema() {
+    return this.schema;
+  }
+
+  @Override
+  public Object value() {
+    return this.datum;
+  }
+
+  @Override
+  public int intValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public long longValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public float floatValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public double doubleValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public boolean booleanValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public String strValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public byte[] bytesValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public List<Object> arrayValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Map<Object, Object> mapValue() {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getElement(int index) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  @Override
+  public Data getFieldData(String fldName) {
+    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+  }
+
+  public static AvroData getArray(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.ARRAY) {
+      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
+
+      @Override
+      public List<Object> arrayValue() {
+        return this.array;
+      }
+
+      @Override
+      public Data getElement(int index) {
+        return this.schema.getElementType().read(array.get(index));
+      }
+
+    };
+  }
+
+  public static AvroData getMap(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.MAP) {
+      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      @SuppressWarnings("unchecked")
+      private final Map<Object, Object> map = (Map<Object, Object>) datum;
+
+      @Override
+      public Map<Object, Object> mapValue() {
+        return this.map;
+      }
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getValueType().read(map.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getStruct(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
+    }
+    return new AvroData(schema, datum) {
+      private final GenericRecord record = (GenericRecord) datum;
+
+      @Override
+      public Data getFieldData(String fldName) {
+        // TODO Auto-generated method stub
+        return this.schema.getFieldType(fldName).read(record.get(fldName));
+      }
+
+    };
+  }
+
+  public static AvroData getInt(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public int intValue() {
+        return ((Integer) datum).intValue();
+      }
+
+    };
+  }
+
+  public static AvroData getLong(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public long longValue() {
+        return ((Long) datum).longValue();
+      }
+
+    };
+  }
+
+  public static AvroData getFloat(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public float floatValue() {
+        return ((Float) datum).floatValue();
+      }
+
+    };
+  }
+
+  public static AvroData getDouble(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public double doubleValue() {
+        return ((Double) datum).doubleValue();
+      }
+
+    };
+  }
+
+  public static AvroData getBoolean(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public boolean booleanValue() {
+        return ((Boolean) datum).booleanValue();
+      }
+
+    };
+  }
+
+  public static AvroData getString(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public String strValue() {
+        return ((CharSequence) datum).toString();
+      }
+
+    };
+  }
+
+  public static AvroData getBytes(AvroSchema schema, Object datum) {
+    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
+      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+          + datum.getClass().getName());
+    }
+    return new AvroData(schema, datum) {
+      @Override
+      public byte[] bytesValue() {
+        return ((ByteBuffer) datum).array();
+      }
+
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
new file mode 100644
index 0000000..c04e4f6
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.samza.operators.api.data.Data;
+import org.apache.samza.operators.api.data.Schema;
+
+
+public class AvroSchema implements Schema {
+
+  protected final org.apache.avro.Schema avroSchema;
+  protected final Schema.Type type;
+
+  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
+      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
+
+  static {
+    primSchemas.put(org.apache.avro.Schema.Type.INT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getInt(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.LONG,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getLong(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getFloat(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getDouble(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBoolean(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.STRING,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getString(this, datum);
+      }
+    });
+    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
+        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
+      @Override
+      public Data read(Object datum) {
+        return AvroData.getBytes(this, datum);
+      }
+    });
+  };
+
+  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
+    Schema.Type type = mapType(schema.getType());
+    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
+      return primSchemas.get(schema.getType());
+    }
+    // otherwise, construct the new schema
+    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
+    switch (type) {
+      case ARRAY:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.ARRAY) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getElementType().equals(this.getElementType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getElementType().getType());
+            }
+            // input type matches array type
+            return AvroData.getArray(this, input.value());
+          }
+        };
+      case MAP:
+        return new AvroSchema(schema) {
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.MAP) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            if (!input.schema().getValueType().equals(this.getValueType())) {
+              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getValueType().getType());
+            }
+            // input type matches map type
+            return AvroData.getMap(this, input.value());
+          }
+        };
+      case STRUCT:
+        return new AvroSchema(schema) {
+          @SuppressWarnings("serial")
+          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
+            {
+              for (Field field : schema.getFields()) {
+                put(field.name(), getSchema(field.schema()));
+              }
+            }
+          };
+
+          @Override
+          public Map<String, Schema> getFields() {
+            return this.fldSchemas;
+          }
+
+          @Override
+          public Schema getFieldType(String fldName) {
+            return this.fldSchemas.get(fldName);
+          }
+
+          @Override
+          public Data transform(Data input) {
+            // This would get all the elements until the length of the current schema's array length
+            if (input.schema().getType() != Schema.Type.STRUCT) {
+              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+                  + input.schema().getType());
+            }
+            // Note: this particular transform function only implements "projection to a sub-set" concept.
+            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
+            for (String fldName : this.fldSchemas.keySet()) {
+              // check each field schema matches input
+              Schema fldSchema = this.fldSchemas.get(fldName);
+              Schema inputFld = input.schema().getFieldType(fldName);
+              if (!fldSchema.equals(inputFld)) {
+                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
+                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
+              }
+            }
+            // input type matches struct type
+            return AvroData.getStruct(this, input.value());
+          }
+
+        };
+      default:
+        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
+    }
+  }
+
+  private AvroSchema(org.apache.avro.Schema schema) {
+    this.avroSchema = schema;
+    this.type = mapType(schema.getType());
+  }
+
+  private static Type mapType(org.apache.avro.Schema.Type type) {
+    switch (type) {
+      case ARRAY:
+        return Schema.Type.ARRAY;
+      case RECORD:
+        return Schema.Type.STRUCT;
+      case MAP:
+        return Schema.Type.MAP;
+      case INT:
+        return Schema.Type.INTEGER;
+      case LONG:
+        return Schema.Type.LONG;
+      case BOOLEAN:
+        return Schema.Type.BOOLEAN;
+      case FLOAT:
+        return Schema.Type.FLOAT;
+      case DOUBLE:
+        return Schema.Type.DOUBLE;
+      case STRING:
+        return Schema.Type.STRING;
+      case BYTES:
+        return Schema.Type.BYTES;
+      default:
+        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+    }
+  }
+
+  @Override
+  public Type getType() {
+    return this.type;
+  }
+
+  @Override
+  public Schema getElementType() {
+    if (this.type != Schema.Type.ARRAY) {
+      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getElementType());
+  }
+
+  @Override
+  public Schema getValueType() {
+    if (this.type != Schema.Type.MAP) {
+      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+    }
+    return getSchema(this.avroSchema.getValueType());
+  }
+
+  @Override
+  public Map<String, Schema> getFields() {
+    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+  }
+
+  @Override
+  public Schema getFieldType(String fldName) {
+    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+  }
+
+  @Override
+  public Data read(Object object) {
+    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
+      return AvroData.getArray(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
+      return AvroData.getMap(this, object);
+    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
+      return AvroData.getStruct(this, object);
+    }
+    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
+  }
+
+  @Override
+  public Data transform(Data inputData) {
+    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
+        || inputData.schema().getType() == Schema.Type.STRUCT) {
+      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
+    }
+    if (inputData.schema().getType() != this.type) {
+      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+          + ", input type:" + inputData.schema().getType());
+    }
+    return inputData;
+  }
+
+  @Override
+  public boolean equals(Schema other) {
+    // TODO Auto-generated method stub
+    if (this.type != other.getType()) {
+      return false;
+    }
+    switch (this.type) {
+      case ARRAY:
+        // check if element types are the same
+        return this.getElementType().equals(other.getElementType());
+      case MAP:
+        // check if value types are the same
+        return this.getValueType().equals(other.getValueType());
+      case STRUCT:
+        // check if the fields schemas in this equals the other
+        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
+        for (String fieldName : this.getFields().keySet()) {
+          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
+            return false;
+          }
+        }
+        return true;
+      default:
+        return true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
new file mode 100644
index 0000000..2432aca
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.apache.samza.operators.impl.data.avro.AvroSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class SqlAvroSerde implements Serde<AvroData> {
+  private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
+
+  private final Schema avroSchema;
+  private final GenericDatumReader<GenericRecord> reader;
+  private final GenericDatumWriter<Object> writer;
+
+  public SqlAvroSerde(Schema avroSchema) {
+    this.avroSchema = avroSchema;
+    this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
+    this.writer = new GenericDatumWriter<Object>(avroSchema);
+  }
+
+  @Override
+  public AvroData fromBytes(byte[] bytes) {
+    GenericRecord data;
+
+    try {
+      data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null));
+      return getAvroData(data, avroSchema);
+    } catch (IOException e) {
+      String errMsg = "Cannot decode message.";
+      log.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  @Override
+  public byte[] toBytes(AvroData object) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = new BinaryEncoder(out);
+
+    try {
+      writer.write(object.value(), encoder);
+      encoder.flush();
+      return out.toByteArray();
+    } catch (IOException e) {
+      String errMsg = "Cannot perform Avro binary encode.";
+      log.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    }
+  }
+
+  private AvroData getAvroData(GenericRecord data, Schema type){
+    AvroSchema schema = AvroSchema.getSchema(type);
+    switch (type.getType()){
+      case RECORD:
+        return AvroData.getStruct(schema, data);
+      case ARRAY:
+        return AvroData.getArray(schema, data);
+      case MAP:
+        return AvroData.getMap(schema, data);
+      case INT:
+        return AvroData.getInt(schema, data);
+      case LONG:
+        return AvroData.getLong(schema, data);
+      case BOOLEAN:
+        return AvroData.getBoolean(schema, data);
+      case FLOAT:
+        return AvroData.getFloat(schema, data);
+      case DOUBLE:
+        return AvroData.getDouble(schema, data);
+      case STRING:
+        return AvroData.getString(schema, data);
+      case BYTES:
+        return AvroData.getBytes(schema, data);
+      default:
+        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
new file mode 100644
index 0000000..edd8859
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.operators.impl.data.avro.AvroData;
+
+public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
+  public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
+
+  @Override
+  public Serde<AvroData> getSerde(String name, Config config) {
+    String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
+    if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
+      throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
+    }
+
+    return new SqlAvroSerde(Schema.parse(avroSchemaStr));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
new file mode 100644
index 0000000..1267ab6
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.serializers;
+
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.operators.impl.data.string.StringData;
+
+
+public class SqlStringSerde implements Serde<StringData> {
+
+    private final Serde<String> serde;
+
+    public SqlStringSerde(String encoding) {
+        this.serde = new StringSerde(encoding);
+    }
+
+    @Override
+    public StringData fromBytes(byte[] bytes) {
+          return new StringData(serde.fromBytes(bytes));
+    }
+
+    @Override
+    public byte[] toBytes(StringData object) {
+        return serde.toBytes(object.strValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
new file mode 100644
index 0000000..3b6a3e0
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.serializers;
+
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.operators.impl.data.string.StringData;
+
+public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
+    @Override
+    public Serde<StringData> getSerde(String name, Config config) {
+        return new SqlStringSerde(config.get("encoding", "UTF-8"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
new file mode 100644
index 0000000..86e9917
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.string;
+
+import org.apache.samza.operators.api.data.Data;
+import org.apache.samza.operators.api.data.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class StringData implements Data {
+    private final Object datum;
+    private final Schema schema;
+
+    public StringData(Object datum) {
+        this.datum = datum;
+        this.schema = new StringSchema();
+    }
+
+    @Override
+    public Schema schema() {
+        return this.schema;
+    }
+
+    @Override
+    public Object value() {
+        return this.datum;
+    }
+
+    @Override
+    public int intValue() {
+        throw new UnsupportedOperationException("Can't get int value for a string type data");
+    }
+
+    @Override
+    public long longValue() {
+        throw new UnsupportedOperationException("Can't get long value for a string type data");
+    }
+
+    @Override
+    public float floatValue() {
+        throw new UnsupportedOperationException("Can't get float value for a string type data");
+    }
+
+    @Override
+    public double doubleValue() {
+        throw new UnsupportedOperationException("Can't get double value for a string type data");
+    }
+
+    @Override
+    public boolean booleanValue() {
+        throw new UnsupportedOperationException("Can't get boolean value for a string type data");
+    }
+
+    @Override
+    public String strValue() {
+        return String.valueOf(datum);
+    }
+
+    @Override
+    public byte[] bytesValue() {
+        throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
+    }
+
+    @Override
+    public List<Object> arrayValue() {
+        throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
+    }
+
+    @Override
+    public Map<Object, Object> mapValue() {
+        throw new UnsupportedOperationException("Can't get mapValue for a string type data");
+    }
+
+    @Override
+    public Data getElement(int index) {
+        throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
+    }
+
+    @Override
+    public Data getFieldData(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
new file mode 100644
index 0000000..b19dfeb
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl.data.string;
+
+import org.apache.samza.operators.api.data.Data;
+import org.apache.samza.operators.api.data.Schema;
+
+import java.util.Map;
+
+public class StringSchema implements Schema {
+    private Type type = Type.STRING;
+
+    @Override
+    public Type getType() {
+      return Type.STRING;
+    }
+
+    @Override
+    public Schema getElementType() {
+      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+    }
+
+    @Override
+    public Schema getValueType() {
+        throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+    }
+
+    @Override
+    public Map<String, Schema> getFields() {
+        throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+    }
+
+    @Override
+    public Schema getFieldType(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+    }
+
+    @Override
+    public Data read(Object object) {
+        return new StringData(object);
+    }
+
+    @Override
+    public Data transform(Data inputData) {
+        if (inputData.schema().getType() != this.type) {
+            throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+                    + ", input type:" + inputData.schema().getType());
+        }
+        return inputData;
+    }
+
+    @Override
+    public boolean equals(Schema other) {
+        return other.getType() == this.type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
new file mode 100644
index 0000000..2de53aa
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.window;
+
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.WindowState;
+import org.apache.samza.operators.api.data.Message;
+import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.StateStoreImpl;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.function.BiFunction;
+
+
+/**
+ * Default implementation class of a {@link WindowOperator} for a session window.
+ *
+ * @param <M>  the type of input {@link Message}
+ * @param <RK>  the type of window key
+ * @param <RM>  the type of aggregated value of the window
+ */
+public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends
+    OperatorImpl<M, RM> {
+  private final BiFunction<M, Entry<RK, WS>, RM> txfmFunction;
+  private final StateStoreImpl<M, RK, WS> wndStore;
+
+  SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd, MessageStream<M> input) {
+    this.txfmFunction = sessWnd.getFunction();
+    this.wndStore = new StateStoreImpl<>(sessWnd.getStoreFunctions(), sessWnd.getStoreName(input));
+  }
+
+  @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    Entry<RK, WS> state = this.wndStore.getState(message);
+    this.nextProcessors(this.txfmFunction.apply(message, state), collector, coordinator);
+    this.wndStore.updateState(message, state);
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    // This is to periodically check the timeout triggers to get the list of window states to be updated
+  }
+
+  @Override protected void init(TaskContext context) {
+    this.wndStore.init(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..e340fe8
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.api.MessageStream;
+import org.apache.samza.operators.api.MessageStreams;
+import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.api.data.IncomingSystemMessage;
+import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * An adaptor task class that invoke the user-implemented (@link StreamOperatorTask}s via {@link MessageStream} programming APIs
+ *
+ */
+public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask {
+  /**
+   * A map with entries mapping {@link SystemStreamPartition} to {@link org.apache.samza.operators.impl.ChainedOperators} that takes the {@link SystemStreamPartition}
+   * as the input stream
+   */
+  private final Map<SystemStreamPartition, ChainedOperators> operatorChains = new HashMap<>();
+
+  /**
+   * Wrapped {@link StreamOperatorTask} class
+   */
+  private final StreamOperatorTask  userTask;
+
+  /**
+   * Constructor that wraps the user-defined {@link StreamOperatorTask}
+   *
+   * @param userTask  the user-defined {@link StreamOperatorTask}
+   */
+  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
+    this.userTask = userTask;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    if (this.userTask instanceof InitableTask) {
+      ((InitableTask) this.userTask).init(config, context);
+    }
+    Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> {
+      SystemMessageStream ds = MessageStreams.input(ssp);
+      sources.put(ssp, ds);
+    });
+    this.userTask.initOperators(sources.values());
+    sources.forEach((ssp, ds) -> operatorChains.put(ssp, ChainedOperators.create(ds, context)));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+    this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new IncomingSystemMessage(ime), collector, coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception{
+    this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, coordinator));
+    if (this.userTask instanceof WindowableTask) {
+      ((WindowableTask) this.userTask).window(collector, coordinator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..cfdb694
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
+import java.util.Collection;
+
+/**
+ * This interface defines the methods that user needs to implement via the operator programming APIs.
+ */
+public interface StreamOperatorTask {
+
+  /**
+   * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s.
+   * Users have to implement this function to instantiate {@link org.apache.samza.operators.impl.ChainedOperators} that
+   * will process each incoming {@link SystemMessageStream}.
+   *
+   * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition}
+   *
+   * @param sources  the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.api.data.IncomingSystemMessage}
+   *                 from a {@link org.apache.samza.system.SystemStreamPartition}
+   */
+  void initOperators(Collection<SystemMessageStream> sources);
+
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
new file mode 100644
index 0000000..0f00fdb
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.operators.api.data.Message;
+
+
+public class TestMessage implements Message<String, String> {
+
+  private final String key;
+  private final String value;
+  private final long timestamp;
+
+  TestMessage(String key, String value, long timestamp) {
+    this.key = key;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  @Override public String getMessage() {
+    return this.value;
+  }
+
+  @Override public String getKey() {
+    return this.key;
+  }
+
+  @Override public long getTimestamp() {
+    return this.timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
new file mode 100644
index 0000000..e6aa692
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMessageStreams {
+
+  @Test public void testInput() {
+    SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0));
+    MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
+    assertEquals(mSysStream.getSystemStreamPartition(), ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
new file mode 100644
index 0000000..8faa92c
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.api;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder{
+  private Field earlyTriggerField;
+  private Field lateTriggerField;
+  private Field timerTriggerField;
+  private Field earlyTriggerUpdater;
+  private Field lateTriggerUpdater;
+
+  @Before
+  public void testPrep() throws Exception {
+    this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
+    this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
+    this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
+    this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+    this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+    this.earlyTriggerField.setAccessible(true);
+    this.lateTriggerField.setAccessible(true);
+    this.timerTriggerField.setAccessible(true);
+    this.earlyTriggerUpdater.setAccessible(true);
+    this.lateTriggerUpdater.setAccessible(true);
+  }
+
+  @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    when(mockState.getNumberMessages()).thenReturn(2000L);
+    assertTrue(triggerField.apply(null, mockState));
+
+    Function<TestMessage, Boolean> tokenFunc = m -> true;
+    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    TestMessage m = mock(TestMessage.class);
+    assertTrue(triggerField.apply(m, mockState));
+
+    builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+    when(m.getTimestamp()).thenReturn(19999000000L);
+    assertFalse(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+    when(m.getTimestamp()).thenReturn(1001000000L);
+    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+    assertTrue(triggerField.apply(m, mockState));
+
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
+    builder = TriggerBuilder.earlyTrigger(mockFunc);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    assertEquals(triggerField, mockFunc);
+
+    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddTimerTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addTimeoutSinceFirstMessage(10000L);
+    // exam that both earlyTrigger and timer triggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    // check the timer trigger
+    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+        (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+
+    // exam that both early trigger and timer triggers are set up
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(triggerField.apply(null, mockState));
+    builder.addTimeoutSinceLastMessage(20000L);
+    // check the timer trigger
+    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+    assertTrue(timerTrigger.apply(mockState));
+    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+    when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+    assertFalse(timerTrigger.apply(mockState));
+  }
+
+  @Test public void testAddLateTriggers() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTriggerOnSizeLimit(10000L);
+    // exam that both earlyTrigger and lateTriggers are set up
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // check the late trigger
+    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
+        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    // set the number of messages to 10001 to trigger the late trigger
+    when(mockState.getNumberMessages()).thenReturn(10001L);
+    assertTrue(lateTrigger.apply(null, mockState));
+
+    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+    // exam that both earlyTrigger and lateTriggers are set up
+    earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+    mockState = mock(WindowState.class);
+    when(mockState.getNumberMessages()).thenReturn(200L);
+    assertFalse(earlyTrigger.apply(null, mockState));
+    // exam the lateTrigger
+    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+    lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+    assertFalse(lateTrigger.apply(null, mockState));
+    List<TestMessage> mockList = mock(ArrayList.class);
+    when(mockList.size()).thenReturn(200);
+    when(mockState.getOutputValue()).thenReturn(mockList);
+    assertTrue(lateTrigger.apply(null, mockState));
+  }
+
+  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
+    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+    builder.onEarlyTrigger(c -> { c.clear(); return c;} );
+    List<TestMessage> collection = new ArrayList<TestMessage>() {{
+      for(int i = 0; i < 10; i++) {
+        this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
+      }
+    }};
+    // exam that earlyTriggerUpdater is set up
+    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
+    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    earlyTriggerUpdater.apply(mockState);
+    assertTrue(collection.isEmpty());
+
+    collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
+    collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
+    builder.onLateTrigger(c -> {
+      c.removeIf(t -> t.getKey().equals("key-to-remove"));
+      return c;
+    });
+    // check the late trigger updater
+    Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
+        (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
+    when(mockState.getOutputValue()).thenReturn(collection);
+    lateTriggerUpdater.apply(mockState);
+    assertTrue(collection.size() == 1);
+    assertFalse(collection.get(0).isDelete());
+    assertEquals(collection.get(0).getKey(), "key-to-stay");
+  }
+}