You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/05/16 04:02:56 UTC

[1/2] samza git commit: SAMZA-552 update operator APIs

Repository: samza
Updated Branches:
  refs/heads/samza-sql b47e47fd7 -> 41c4cd012


http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
deleted file mode 100644
index ba8bfb5..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.relation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * This class implements specification class for the build-in <code>Join</code> operator
- */
-public class JoinSpec extends SimpleOperatorSpec implements OperatorSpec {
-  /**
-   * Join keys defined for each input relation
-   */
-  private final List<String> joinKeys = new ArrayList<String>();
-
-  /**
-   * Default ctor for the <code>JoinSpec</code>
-   *
-   * @param id Unique ID of the <code>Join</code> object
-   * @param joinIns The list of input relations
-   * @param joinOut The output relation
-   * @param joinKeys The list of join keys in input relations
-   */
-  public JoinSpec(String id, List<EntityName> joinIns, EntityName joinOut, List<String> joinKeys) {
-    super(id, joinIns, joinOut);
-    this.joinKeys.addAll(joinKeys);
-  }
-
-  /**
-   * Method to get the list of join keys
-   *
-   * @return The list of join keys
-   */
-  public List<String> getJoinKeys() {
-    return this.joinKeys;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
deleted file mode 100644
index 7563100..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.stream;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for an istream operator that converts a relation to a stream
- *
- */
-public class InsertStream extends SimpleOperator implements RelationOperator {
-  /**
-   * The <code>InsertStreamSpec</code> for this operator
-   */
-  private final InsertStreamSpec spec;
-
-  /**
-   * The time-varying relation that is to be converted into a stream
-   */
-  private Relation relation = null;
-
-  /**
-   * Ctor that takes the specication of the object as input parameter
-   *
-   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
-   *
-   * @param spec The <code>InsertStreamSpec</code> specification of this operator
-   */
-  public InsertStream(InsertStreamSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * An alternative ctor that allow users to create an <code>InsertStream</code> object randomly
-   *
-   * @param id The identifier of the <code>InsertStream</code> object
-   * @param input The input relation
-   * @param output The output stream
-   */
-  public InsertStream(String id, String input, String output) {
-    super(new InsertStreamSpec(id, EntityName.getRelationName(input), EntityName.getStreamName(output)));
-    this.spec = (InsertStreamSpec) super.getSpec();
-  }
-
-  @Override
-  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
-    KeyValueIterator<Object, Tuple> iterator = deltaRelation.all();
-    for (; iterator.hasNext();) {
-      Tuple tuple = iterator.next().getValue();
-      if (!tuple.isDelete()) {
-        collector.send(tuple);
-      }
-    }
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    if (this.relation == null) {
-      this.relation = (Relation) context.getStore(this.spec.getInputName().toString());
-    }
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // TODO Auto-generated method stub
-    // assuming this operation does not have pending changes kept in memory
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
deleted file mode 100644
index 70475ce..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.stream;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * Example implementation of specification of <code>InsertStream</code> operator
- */
-public class InsertStreamSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * Default ctor of <code>InsertStreamSpec</code>
-   *
-   * @param id The identifier of the operator
-   * @param input The input relation entity
-   * @param output The output stream entity
-   */
-  public InsertStreamSpec(String id, EntityName input, EntityName output) {
-    super(id, input, output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
index 935ffc0..d81cc93 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
@@ -26,20 +26,19 @@ import org.apache.samza.config.Config;
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
 import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
+import org.apache.samza.task.sql.SimpleMessageCollector;
 
 
 /**
  * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation
  *
  */
-public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
+public class BoundedTimeWindow extends SimpleOperatorImpl {
 
   /**
    * The specification of this window operator
@@ -59,7 +58,7 @@ public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
   /**
    * Ctor that takes <code>WindowSpec</code> specification as input argument
    *
-   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
+   * <p>This version of constructor is often used in an implementation of {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
    *
    * @param spec The window specification object
    */
@@ -77,20 +76,23 @@ public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
    * @param output The output relation name
    */
   public BoundedTimeWindow(String wndId, int lengthSec, String input, String output) {
-    super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getRelationName(output), lengthSec));
-    this.spec = (WindowSpec) super.getSpec();
+    this(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getStreamName(output), lengthSec));
   }
 
-  @Override
-  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    // for each tuple, this will evaluate the incoming tuple and update the window states.
-    // If the window states allow generating output, calculate the delta changes in
-    // the window relation and execute the relation operation <code>nextOp</code>
-    updateWindow(tuple);
-    processWindowChanges(collector);
+  /**
+   * A simplified version of ctor that allows users to randomly created a window operator w/o spec object
+   *
+   * @param wndId The identifier of this window operator
+   * @param lengthSec The window size in seconds
+   * @param input The input stream name
+   * @param output The output relation name
+   */
+  public BoundedTimeWindow(String wndId, int lengthSec, String input, String output, OperatorCallback callback) {
+    super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getStreamName(output), lengthSec), callback);
+    this.spec = (WindowSpec) super.getSpec();
   }
 
-  private void processWindowChanges(SqlMessageCollector collector) throws Exception {
+  private void processWindowChanges(SimpleMessageCollector collector) throws Exception {
     if (windowStateChange()) {
       collector.send(getWindowChanges());
     }
@@ -119,14 +121,6 @@ public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
   }
 
   @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
-    updateWindowTimeout();
-    processWindowChanges(sqlCollector);
-    sqlCollector.timeout(this.spec.getOutputNames());
-  }
-
-  @Override
   public void init(Config config, TaskContext context) throws Exception {
     // TODO Auto-generated method stub
     if (this.relation == null) {
@@ -138,4 +132,30 @@ public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
       }
     }
   }
+
+  @Override
+  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    // for each tuple, this will evaluate the incoming tuple and update the window states.
+    // If the window states allow generating output, calculate the delta changes in
+    // the window relation and execute the relation operation <code>nextOp</code>
+    updateWindow(tuple);
+    processWindowChanges(collector);
+  }
+
+  @Override
+  protected void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    for (KeyValueIterator<Object, Tuple> iter = rel.all(); iter.hasNext();) {
+      updateWindow(iter.next().getValue());
+      processWindowChanges(collector);
+    }
+  }
+
+  @Override
+  protected void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    updateWindowTimeout();
+    processWindowChanges(collector);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
index e2ae3aa..eec32ea 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
@@ -20,12 +20,12 @@
 package org.apache.samza.sql.operators.window;
 
 import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.api.operators.OperatorSpec;
 import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
 
 
 /**
- * This class implements the specification class for the build-in <code>BoundedTimeWindow</code> operator
+ * This class implements the specification class for the build-in {@link org.apache.samza.sql.operators.window.BoundedTimeWindow} operator
  */
 public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
 
@@ -35,7 +35,7 @@ public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
   private final int wndSizeSec;
 
   /**
-   * Default ctor of the <code>WindowSpec</code> object
+   * Default ctor of the {@code WindowSpec} object
    *
    * @param id The identifier of the operator
    * @param input The input stream entity

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
deleted file mode 100644
index c6fc673..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.router;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-
-
-/**
- * Example implementation of <code>OperatorRouter</code>
- *
- */
-public class SimpleRouter implements OperatorRouter {
-  /**
-   * List of operators added to the <code>OperatorRouter</code>
-   */
-  private List<Operator> operators = new ArrayList<Operator>();
-
-  @SuppressWarnings("rawtypes")
-  /**
-   * Map of <code>EntityName</code> to the list of operators associated with it
-   */
-  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
-
-  /**
-   * List of <code>EntityName</code> as system inputs
-   */
-  private List<EntityName> inputEntities = new ArrayList<EntityName>();
-
-  @SuppressWarnings("unchecked")
-  private void addOperator(EntityName output, Operator nextOp) {
-    if (nextOps.get(output) == null) {
-      nextOps.put(output, new ArrayList<Operator>());
-    }
-    nextOps.get(output).add(nextOp);
-    operators.add(nextOp);
-
-  }
-
-  @Override
-  public Iterator<Operator> iterator() {
-    return operators.iterator();
-  }
-
-  @Override
-  public void addTupleOperator(EntityName outputStream, TupleOperator nextOp) throws Exception {
-    if (!outputStream.isStream()) {
-      throw new IllegalArgumentException("Can't attach an TupleOperator " + nextOp.getSpec().getId()
-          + " to a non-stream entity " + outputStream);
-    }
-    addOperator(outputStream, nextOp);
-  }
-
-  @Override
-  public void addRelationOperator(EntityName outputRelation, RelationOperator nextOp) throws Exception {
-    if (!outputRelation.isRelation()) {
-      throw new IllegalArgumentException("Can't attach an RelationOperator " + nextOp.getSpec().getId()
-          + " to a non-relation entity " + outputRelation);
-    }
-    addOperator(outputRelation, nextOp);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<RelationOperator> getRelationOperators(EntityName outputRelation) {
-    if (!outputRelation.isRelation()) {
-      throw new IllegalArgumentException("Can't get RelationOperators for a non-relation output: " + outputRelation);
-    }
-    return nextOps.get(outputRelation);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<TupleOperator> getTupleOperators(EntityName outputStream) {
-    if (!outputStream.isStream()) {
-      throw new IllegalArgumentException("Can't get TupleOperators for a non-stream output: " + outputStream);
-    }
-    return nextOps.get(outputStream);
-  }
-
-  @Override
-  public boolean hasNextOperators(EntityName output) {
-    return nextOps.get(output) != null && !nextOps.get(output).isEmpty();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<Operator> getNextOperators(EntityName output) {
-    return nextOps.get(output);
-  }
-
-  @Override
-  public void addSystemInput(EntityName input) {
-    if (!nextOps.containsKey(input) || nextOps.get(input).isEmpty()) {
-      throw new IllegalStateException("Can't set a system input w/o any next operators. input:" + input);
-    }
-    if (!inputEntities.contains(input)) {
-      inputEntities.add(input);
-    }
-  }
-
-  @Override
-  public List<EntityName> getSystemInputs() {
-    return this.inputEntities;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java b/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
new file mode 100644
index 0000000..e56d3b3
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.window.storage;
+
+/**
+ * This defines the base class for all keys used in window operators
+ */
+public abstract class OrderedStoreKey implements Comparable<OrderedStoreKey> {
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java b/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
new file mode 100644
index 0000000..102819d
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.sql;
+
+/**
+ * An implementation of {@link org.apache.samza.system.sql.Offset}, w/ {@code long} value as the offset
+ */
+public class LongOffset implements Offset {
+
+  /**
+   * The offset value in {@code long}
+   */
+  private final Long offset;
+
+  private LongOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public LongOffset(String offset) {
+    this.offset = Long.valueOf(offset);
+  }
+
+  @Override
+  public int compareTo(Offset o) {
+    if (!(o instanceof LongOffset)) {
+      throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
+    }
+    LongOffset other = (LongOffset) o;
+    return this.offset.compareTo(other.offset);
+  }
+
+  /**
+   * Helper method to get the minimum offset
+   *
+   * @return The minimum offset
+   */
+  public static LongOffset getMinOffset() {
+    return new LongOffset(Long.MIN_VALUE);
+  }
+
+  /**
+   * Helper method to get the maximum offset
+   *
+   * @return The maximum offset
+   */
+  public static LongOffset getMaxOffset() {
+    return new LongOffset(Long.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java b/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
new file mode 100644
index 0000000..98547e2
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.sql;
+
+/**
+ * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
+ */
+public interface Offset extends Comparable<Offset> {
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
deleted file mode 100644
index 1e5310f..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Example implementation of a <code>SqlMessageCollector</code> that uses <code>OperatorRouter</code>
- *
- */
-public class OperatorMessageCollector implements SqlMessageCollector {
-
-  private final MessageCollector collector;
-  private final TaskCoordinator coordinator;
-  private final OperatorRouter rteCntx;
-
-  public OperatorMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) {
-    this.collector = collector;
-    this.coordinator = coordinator;
-    this.rteCntx = rteCntx;
-  }
-
-  @Override
-  public void send(Relation deltaRelation) throws Exception {
-    for (RelationOperator op : this.rteCntx.getRelationOperators(deltaRelation.getName())) {
-      op.process(deltaRelation, this);
-    }
-  }
-
-  @Override
-  public void send(Tuple tuple) throws Exception {
-    for (TupleOperator op : this.rteCntx.getTupleOperators(tuple.getStreamName())) {
-      op.process(tuple, this);
-    }
-  }
-
-  @Override
-  public void timeout(List<EntityName> outputs) throws Exception {
-    for (EntityName output : outputs) {
-      for (Operator op : this.rteCntx.getNextOperators(output)) {
-        op.window(this, this.coordinator);
-      }
-    }
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    this.collector.send(envelope);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
new file mode 100644
index 0000000..18eb0a3
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorRouter;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * This class extends {@link org.apache.samza.task.sql.SimpleMessageCollector} that uses {@link org.apache.samza.sql.api.operators.OperatorRouter}
+ *
+ */
+public class RouterMessageCollector extends SimpleMessageCollector {
+
+  private final OperatorRouter rteCntx;
+
+  public RouterMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) {
+    super(collector, coordinator);
+    this.rteCntx = rteCntx;
+  }
+
+  @Override
+  protected void realSend(Relation rel) throws Exception {
+    for (SimpleOperator op : this.rteCntx.getNextOperators(rel.getName())) {
+      op.process(rel, this, coordinator);
+    }
+  }
+
+  @Override
+  protected void realSend(Tuple tuple) throws Exception {
+    for (SimpleOperator op : this.rteCntx.getNextOperators(tuple.getEntityName())) {
+      op.process(tuple, this, coordinator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
new file mode 100644
index 0000000..b29838a
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public class SimpleMessageCollector implements MessageCollector {
+  protected final MessageCollector collector;
+  protected final TaskCoordinator coordinator;
+  protected OperatorCallback callback;
+
+  /**
+   * Ctor that creates the {@code SimpleMessageCollector} from scratch
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @param callback The {@link org.apache.samza.sql.api.operators.OperatorCallback} in the context
+   */
+  public SimpleMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorCallback callback) {
+    this.collector = collector;
+    this.coordinator = coordinator;
+    if (callback == null) {
+      this.callback = new NoopOperatorCallback();
+    } else {
+      this.callback = callback;
+    }
+  }
+
+  /**
+   * Ctor that creates the {@code SimpleMessageCollector} from scratch
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   */
+  public SimpleMessageCollector(MessageCollector collector, TaskCoordinator coordinator) {
+    this.collector = collector;
+    this.coordinator = coordinator;
+  }
+
+  /**
+   * This method swaps the {@code callback} with the new one
+   *
+   * <p> This method allows the {@link org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the collector
+   * is passed down into the next operator's context. Hence, under the new operator's context, the correct {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, MessageCollector, TaskCoordinator)},
+   * and {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, MessageCollector, TaskCoordinator)} can be invoked
+   *
+   * @param callback The new {@link org.apache.samza.sql.api.operators.OperatorCallback} to be set
+   */
+  public void switchOperatorCallback(OperatorCallback callback) {
+    this.callback = callback;
+  }
+
+  /**
+   * Method is declared to be final s.t. we enforce that the callback functions are called first
+   */
+  final public void send(Relation deltaRelation) throws Exception {
+    Relation rel = this.callback.afterProcess(deltaRelation, collector, coordinator);
+    if (rel == null) {
+      return;
+    }
+    this.realSend(rel);
+  }
+
+  /**
+   * Method is declared to be final s.t. we enforce that the callback functions are called first
+   */
+  final public void send(Tuple tuple) throws Exception {
+    Tuple otuple = this.callback.afterProcess(tuple, collector, coordinator);
+    if (otuple == null) {
+      return;
+    }
+    this.realSend(otuple);
+  }
+
+  protected void realSend(Relation rel) throws Exception {
+    for (KeyValueIterator<?, Tuple> iter = rel.all(); iter.hasNext();) {
+      Entry<?, Tuple> entry = iter.next();
+      this.collector.send((OutgoingMessageEnvelope) entry.getValue().getMessage());
+    }
+  }
+
+  protected void realSend(Tuple tuple) throws Exception {
+    this.collector.send((OutgoingMessageEnvelope) tuple.getMessage());
+  }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    this.collector.send(envelope);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
deleted file mode 100644
index b98e2d7..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.MessageCollector;
-
-
-/**
- * This class defines the interface class to be used by the operators to send their output via runtime system resources,
- * s.t. the output system streams, the system storage, or <code>OperatorRouter</code>.
- *
- */
-public interface SqlMessageCollector extends MessageCollector {
-
-  /**
-   * This method allows the current operator send its relation output to next
-   *
-   * @param deltaRelation The delta <code>Relation</code> output generated by the current operator
-   * @throws Exception Throws exception if failed
-   */
-  void send(Relation deltaRelation) throws Exception;
-
-  /**
-   * This method allows the current operator send its tuple output to next
-   *
-   * @param tuple The <code>Tuple</code> object generated by the current operator
-   * @throws Exception Throws exception if failed
-   */
-  void send(Tuple tuple) throws Exception;
-
-  /**
-   * This method allows the current operator triggers timeout actions via the <code>SqlMessageCollector</code>.
-   *
-   * <p>This method sets timeout events to the corresponding <code>outputEntities</code> s.t. the next operators
-   * attached to those entities will be notified of the timeout.
-   *
-   * @param outputEntities The list of output entities via which the timeout event needs to be sent to
-   * @throws Exception Throws exception if failed
-   */
-  void timeout(List<EntityName> outputEntities) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
deleted file mode 100644
index b4b0e59..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-
-
-/**
- * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators
- *
- */
-public class StoreMessageCollector implements SqlMessageCollector {
-
-  private final KeyValueStore<EntityName, List<Object>> outputStore;
-
-  public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
-    this.outputStore = store;
-  }
-
-  @Override
-  public void send(Relation deltaRelation) throws Exception {
-    saveOutput(deltaRelation.getName(), deltaRelation);
-  }
-
-  @Override
-  public void send(Tuple tuple) throws Exception {
-    saveOutput(tuple.getStreamName(), tuple);
-  }
-
-  @Override
-  public void timeout(List<EntityName> outputs) throws Exception {
-    // TODO Auto-generated method stub
-  }
-
-  public List<Object> removeOutput(EntityName id) {
-    List<Object> output = outputStore.get(id);
-    outputStore.delete(id);
-    return output;
-  }
-
-  private void saveOutput(EntityName name, Object output) {
-    if (this.outputStore.get(name) == null) {
-      this.outputStore.put(name, new ArrayList<Object>());
-    }
-    List<Object> outputs = this.outputStore.get(name);
-    outputs.add(output);
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    saveOutput(
-        EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()),
-        envelope);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
deleted file mode 100644
index 4ec7dbb..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the following operations:
- * <p>a. the two streams are each processed by a window operator to convert to relations
- * <p>b. a join operator is applied on the two relations to generate join results
- * <p>c. finally, the join results are sent out to the system output
- *
- */
-public class RandomOperatorTask implements StreamTask, InitableTask, WindowableTask {
-  private KeyValueStore<EntityName, List<Object>> opOutputStore;
-  private BoundedTimeWindow wndOp1;
-  private BoundedTimeWindow wndOp2;
-  private Join joinOp;
-
-  private BoundedTimeWindow getWindowOp(EntityName streamName) {
-    if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) {
-      return this.wndOp1;
-    } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) {
-      return this.wndOp2;
-    }
-
-    throw new IllegalArgumentException("No window operator found for stream: " + streamName);
-  }
-
-  private void processJoinOutput(List<Object> outputs, MessageCollector collector) {
-    // get each tuple in the join operator's outputs and send it to system stream
-    for (Object joinOutput : outputs) {
-      for (KeyValueIterator<Object, Tuple> iter = ((Relation) joinOutput).all(); iter.hasNext();) {
-        Tuple otuple = iter.next().getValue();
-        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "joinOutput1"), otuple.getKey(), otuple
-            .getMessage()));
-      }
-    }
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // construct the input tuple
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-
-    // based on tuple's stream name, get the window op and run process()
-    BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName());
-    wndOp.process(ituple, sqlCollector);
-    List<Object> wndOutputs = sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0));
-    if (wndOutputs.isEmpty()) {
-      return;
-    }
-
-    // process all output from the window operator
-    for (Object input : wndOutputs) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // trigger timeout event on both window operators
-    this.wndOp1.window(sqlCollector, coordinator);
-    this.wndOp2.window(sqlCollector, coordinator);
-
-    // for all outputs from the window operators, call joinOp.process()
-    for (Object input : sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    for (Object input : sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // 1. create a fixed length 10 sec window operator
-    this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1");
-    this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", "relation2");
-    // 2. create a join operation
-    List<String> inputRelations = new ArrayList<String>();
-    inputRelations.add("relation1");
-    inputRelations.add("relation2");
-    List<String> joinKeys = new ArrayList<String>();
-    joinKeys.add("key1");
-    joinKeys.add("key2");
-    this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys);
-    // Finally, initialize all operators
-    this.opOutputStore =
-        (KeyValueStore<EntityName, List<Object>>) context.getStore("samza-sql-operator-output-kvstore");
-    this.wndOp1.init(config, context);
-    this.wndOp2.init(config, context);
-    this.joinOp.init(config, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
new file mode 100644
index 0000000..20dc701
--- /dev/null
+++ b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.data.IncomingMessageTuple;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+
+/***
+ * This example illustrate a use case for the full-state timed window operator
+ *
+ */
+public class RandomWindowOperatorTask implements StreamTask, InitableTask, WindowableTask {
+  private BoundedTimeWindow wndOp;
+
+  private final OperatorCallback wndCallback = new OperatorCallback() {
+
+    @Override
+    public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      return tuple;
+    }
+
+    @Override
+    public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+      return rel;
+    }
+
+    @Override
+    public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+      return rel;
+    }
+
+    @Override
+    public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      return filterWindowOutput(tuple, collector, coordinator);
+    }
+
+    private Tuple filterWindowOutput(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      // filter all delete tuples before send
+      if (tuple.isDelete()) {
+        return null;
+      }
+      return tuple;
+    }
+
+  };
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    // based on tuple's stream name, get the window op and run process()
+    wndOp.process(new IncomingMessageTuple(envelope), collector, coordinator);
+
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    // based on tuple's stream name, get the window op and run process()
+    wndOp.refresh(System.nanoTime(), collector, coordinator);
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // 1. create a fixed length 10 sec window operator
+    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1", this.wndCallback);
+    this.wndOp.init(config, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
index 4796fa6..9124e3c 100644
--- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
@@ -20,27 +20,15 @@
 package org.apache.samza.task.sql;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
 import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl;
+import org.apache.samza.sql.operators.factory.SimpleRouter;
+import org.apache.samza.sql.operators.join.StreamStreamJoin;
 import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
 import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-import org.apache.samza.sql.router.SimpleRouter;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
@@ -63,49 +51,31 @@ import org.apache.samza.task.WindowableTask;
  */
 public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
 
-  private OperatorRouter rteCntx;
+  private SimpleRouter rteCntx;
 
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
       throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-    for (Iterator<TupleOperator> iter = this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter
-        .hasNext();) {
-      iter.next().process(ituple, opCollector);
-    }
-
+    this.rteCntx.process(new IncomingMessageTuple(envelope), collector, coordinator);
   }
 
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    for (EntityName entity : this.rteCntx.getSystemInputs()) {
-      for (Iterator<Operator> iter = this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) {
-        iter.next().window(opCollector, coordinator);
-      }
-    }
-
+    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
   }
 
   @Override
   public void init(Config config, TaskContext context) throws Exception {
-    // create specification of all operators first
-    // 1. create 2 window specifications that define 2 windows of fixed length of 10 seconds
-    final WindowSpec spec1 =
-        new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"),
-            EntityName.getRelationName("fixedWndOutput1"), 10);
-    final WindowSpec spec2 =
-        new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"),
-            EntityName.getRelationName("fixedWndOutput2"), 10);
-    // 2. create a join specification that join the output from 2 window operators together
+    // create all operators via the operator factory
+    // 1. create two window operators
+    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", "fixedWndOutput1");
+    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", "fixedWndOutput2");
+    // 2. create one join operator
     @SuppressWarnings("serial")
-    List<EntityName> inputRelations = new ArrayList<EntityName>() {
+    List<String> inputRelations = new ArrayList<String>() {
       {
-        add(spec1.getOutputName());
-        add(spec2.getOutputName());
+        add("fixedWndOutput1");
+        add("fixedWndOutput2");
       }
     };
     @SuppressWarnings("serial")
@@ -115,45 +85,20 @@ public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
         add("key2");
       }
     };
-    JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, EntityName.getRelationName("joinOutput"), joinKeys);
-    // 3. create the specification of an istream operator that convert the output from join to a stream
-    InsertStreamSpec istrmSpec =
-        new InsertStreamSpec("istremOp", joinSpec.getOutputName(), EntityName.getStreamName("istrmOutput1"));
-    // 4. create the specification of a partition operator that re-partitions the stream based on <code>joinKey</code>
-    PartitionSpec parSpec =
-        new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new SystemStream("kafka", "parOutputStrm1"),
-            "joinKey", 50);
-
-    // create all operators via the operator factory
-    // 1. create two window operators
-    SimpleOperatorFactoryImpl operatorFactory = new SimpleOperatorFactoryImpl();
-    BoundedTimeWindow wnd1 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec1);
-    BoundedTimeWindow wnd2 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec2);
-    // 2. create one join operator
-    Join join = (Join) operatorFactory.getRelationOperator(joinSpec);
-    // 3. create one stream operator
-    InsertStream istream = (InsertStream) operatorFactory.getRelationOperator(istrmSpec);
+    StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, "joinOutput", joinKeys);
     // 4. create a re-partition operator
-    PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec);
+    PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", "parOutputStrm1", "joinKey", 50);
 
     // Now, connecting the operators via the OperatorRouter
     this.rteCntx = new SimpleRouter();
     // 1. set two system input operators (i.e. two window operators)
-    this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1);
-    this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2);
+    this.rteCntx.addOperator(wnd1);
+    this.rteCntx.addOperator(wnd2);
     // 2. connect join operator to both window operators
-    this.rteCntx.addRelationOperator(spec1.getOutputName(), join);
-    this.rteCntx.addRelationOperator(spec2.getOutputName(), join);
-    // 3. connect stream operator to the join operator
-    this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream);
-    // 4. connect re-partition operator to the stream operator
-    this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par);
-    // 5. set the system inputs
-    this.rteCntx.addSystemInput(spec1.getInputName());
-    this.rteCntx.addSystemInput(spec2.getInputName());
+    this.rteCntx.addOperator(join);
+    // 3. connect re-partition operator to the stream operator
+    this.rteCntx.addOperator(par);
 
-    for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) {
-      iter.next().init(config, context);
-    }
+    this.rteCntx.init(config, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
new file mode 100644
index 0000000..96e96c3
--- /dev/null
+++ b/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Stream;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.data.IncomingMessageTuple;
+import org.apache.samza.sql.operators.factory.SimpleRouter;
+import org.apache.samza.sql.operators.join.StreamStreamJoin;
+import org.apache.samza.sql.operators.partition.PartitionOp;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+
+/***
+ * This example illustrate a SQL join operation that joins two streams together using the folowing operations:
+ * <ul>
+ * <li>a. the two streams are each processed by a window operator to convert to relations
+ * <li>b. a join operator is applied on the two relations to generate join results
+ * <li>c. an istream operator is applied on join output and convert the relation into a stream
+ * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output
+ * </ul>
+ *
+ * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
+ * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together.
+ */
+public class UserCallbacksSqlTask implements StreamTask, InitableTask, WindowableTask {
+
+  private SimpleRouter simpleRtr;
+
+  private final OperatorCallback wndCallback = new OperatorCallback() {
+
+    @Override
+    public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      return filterWindowInput(tuple, collector, coordinator);
+    }
+
+    @Override
+    public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+      return onRelationInput(rel, collector, coordinator);
+    }
+
+    @Override
+    public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+      return rel;
+    }
+
+    @Override
+    public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      return tuple;
+    }
+
+    private Tuple filterWindowInput(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+      // filter all delete tuples before send
+      if (tuple.isDelete()) {
+        return null;
+      }
+      return tuple;
+    }
+
+    private Relation onRelationInput(Relation rel, MessageCollector collector,
+        TaskCoordinator coordinator) {
+      // check whether the input is a stream
+      if (!(rel instanceof Stream<?>)) {
+        throw new IllegalArgumentException("Wrong input entity");
+      }
+      return rel;
+    }
+  };
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    this.simpleRtr.process(new IncomingMessageTuple(envelope), collector, coordinator);
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    this.simpleRtr.refresh(System.nanoTime(), collector, coordinator);
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // create all operators via the operator factory
+    // 1. create two window operators
+    BoundedTimeWindow wnd1 =
+        new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", "fixedWndOutput1", this.wndCallback);
+    BoundedTimeWindow wnd2 =
+        new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", "fixedWndOutput2", this.wndCallback);
+    // 2. create one join operator
+    @SuppressWarnings("serial")
+    List<String> inputRelations = new ArrayList<String>() {
+      {
+        add("fixedWndOutput1");
+        add("fixedWndOutput2");
+      }
+    };
+    @SuppressWarnings("serial")
+    List<String> joinKeys = new ArrayList<String>() {
+      {
+        add("key1");
+        add("key2");
+      }
+    };
+    StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, "joinOutput", joinKeys);
+    // 4. create a re-partition operator
+    PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", "parOutputStrm1", "joinKey", 50);
+
+    // Now, connecting the operators via the OperatorRouter
+    this.simpleRtr = new SimpleRouter();
+    // 1. set two system input operators (i.e. two window operators)
+    this.simpleRtr.addOperator(wnd1);
+    this.simpleRtr.addOperator(wnd2);
+    // 2. connect join operator to both window operators
+    this.simpleRtr.addOperator(join);
+    // 3. connect re-partition operator to the stream operator
+    this.simpleRtr.addOperator(par);
+
+    this.simpleRtr.init(config, context);
+  }
+}


[2/2] samza git commit: SAMZA-552 update operator APIs

Posted by ni...@apache.org.
SAMZA-552 update operator APIs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c4cd01
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c4cd01
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c4cd01

Branch: refs/heads/samza-sql
Commit: 41c4cd0124b21a49bf92cc73ac2a5acd1b21712f
Parents: b47e47f
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Fri May 15 19:02:34 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Fri May 15 19:02:34 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/api/data/EntityName.java   |  33 ++--
 .../org/apache/samza/sql/api/data/Relation.java |  15 +-
 .../org/apache/samza/sql/api/data/Stream.java   |  40 +++++
 .../org/apache/samza/sql/api/data/Table.java    |  38 +++++
 .../org/apache/samza/sql/api/data/Tuple.java    |  19 ++-
 .../samza/sql/api/operators/Operator.java       |  58 +++++--
 .../sql/api/operators/OperatorCallback.java     |  70 +++++++++
 .../samza/sql/api/operators/OperatorRouter.java |  54 +++++++
 .../samza/sql/api/operators/OperatorSpec.java   |  58 +++++++
 .../sql/api/operators/RelationOperator.java     |  51 -------
 .../samza/sql/api/operators/SimpleOperator.java |  34 +++++
 .../sql/api/operators/SqlOperatorFactory.java   |  24 +--
 .../samza/sql/api/operators/TupleOperator.java  |  47 ------
 .../sql/api/operators/spec/OperatorSpec.java    |  64 --------
 .../samza/sql/api/router/OperatorRouter.java    | 126 ----------------
 .../samza/sql/data/IncomingMessageTuple.java    |  27 +++-
 .../operators/factory/NoopOperatorCallback.java |  50 ++++++
 .../sql/operators/factory/SimpleOperator.java   |  50 ------
 .../factory/SimpleOperatorFactoryImpl.java      |  34 ++---
 .../operators/factory/SimpleOperatorImpl.java   | 136 +++++++++++++++++
 .../operators/factory/SimpleOperatorSpec.java   |  12 +-
 .../sql/operators/factory/SimpleRouter.java     | 136 +++++++++++++++++
 .../sql/operators/join/StreamStreamJoin.java    | 117 ++++++++++++++
 .../operators/join/StreamStreamJoinSpec.java    |  38 +++++
 .../sql/operators/partition/PartitionOp.java    |  55 +++++--
 .../sql/operators/partition/PartitionSpec.java  |  12 +-
 .../samza/sql/operators/relation/Join.java      | 139 -----------------
 .../samza/sql/operators/relation/JoinSpec.java  |  60 --------
 .../sql/operators/stream/InsertStream.java      |  98 ------------
 .../sql/operators/stream/InsertStreamSpec.java  |  42 ------
 .../sql/operators/window/BoundedTimeWindow.java |  68 ++++++---
 .../samza/sql/operators/window/WindowSpec.java  |   6 +-
 .../apache/samza/sql/router/SimpleRouter.java   | 133 ----------------
 .../sql/window/storage/OrderedStoreKey.java     |  26 ++++
 .../org/apache/samza/system/sql/LongOffset.java |  66 ++++++++
 .../org/apache/samza/system/sql/Offset.java     |  27 ++++
 .../task/sql/OperatorMessageCollector.java      |  80 ----------
 .../samza/task/sql/RouterMessageCollector.java  |  56 +++++++
 .../samza/task/sql/SimpleMessageCollector.java  | 114 ++++++++++++++
 .../samza/task/sql/SqlMessageCollector.java     |  64 --------
 .../samza/task/sql/StoreMessageCollector.java   |  80 ----------
 .../samza/task/sql/RandomOperatorTask.java      | 151 -------------------
 .../task/sql/RandomWindowOperatorTask.java      |  96 ++++++++++++
 .../apache/samza/task/sql/StreamSqlTask.java    |  97 +++---------
 .../samza/task/sql/UserCallbacksSqlTask.java    | 150 ++++++++++++++++++
 45 files changed, 1550 insertions(+), 1401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
index 127a677..80ba455 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
@@ -24,15 +24,15 @@ import java.util.Map;
 
 
 /**
- * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. relations and streams.
+ * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. tables and streams.
  */
 public class EntityName {
   /**
-   * <code>EntityType</code> defines the types of the entity names
+   * {@code EntityType} defines the types of the entity names
    *
    */
   private enum EntityType {
-    RELATION,
+    TABLE,
     STREAM
   };
 
@@ -44,16 +44,15 @@ public class EntityName {
   /**
    * Formatted name of the entity.
    *
-   * <p>This formatted name of the entity should be unique identifier for the corresponding relation/stream in the system.
+   * <p>This formatted name of the entity should be unique identifier for the corresponding table/stream in the system.
    * e.g. for a Kafka system stream named "mystream", the formatted name should be "kafka:mystream".
    */
   private final String name;
 
-  //TODO: we may want to replace the map with Guava cache to allow GC
   /**
-   * Static map of already allocated relation names
+   * Static map of already allocated table names
    */
-  private static Map<String, EntityName> relations = new HashMap<String, EntityName>();
+  private static Map<String, EntityName> tables = new HashMap<String, EntityName>();
 
   /**
    * Static map of already allocated stream names
@@ -86,18 +85,18 @@ public class EntityName {
   }
 
   /**
-   * Check to see whether this entity name is for a relation
+   * Check to see whether this entity name is for a table
    *
-   * @return true if the entity type is <code>EntityType.RELATION</code>; false otherwise
+   * @return true if the entity type is {@code EntityType.TABLE}; false otherwise
    */
-  public boolean isRelation() {
-    return this.type.equals(EntityType.RELATION);
+  public boolean isTable() {
+    return this.type.equals(EntityType.TABLE);
   }
 
   /**
    * Check to see whether this entity name is for a stream
    *
-   * @return true if the entity type is <code>EntityType.STREAM</code>; false otherwise
+   * @return true if the entity type is {@code EntityType.STREAM}; false otherwise
    */
   public boolean isStream() {
     return this.type.equals(EntityType.STREAM);
@@ -113,16 +112,16 @@ public class EntityName {
   }
 
   /**
-   * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.RELATION</code>
+   * Static method to get the instance of {@code EntityName} with type {@code EntityType.TABLE}
    *
    * @param name The formatted entity name of the relation
    * @return A <code>EntityName</code> for a relation
    */
-  public static EntityName getRelationName(String name) {
-    if (relations.get(name) == null) {
-      relations.put(name, new EntityName(EntityType.RELATION, name));
+  public static EntityName getTableName(String name) {
+    if (tables.get(name) == null) {
+      tables.put(name, new EntityName(EntityType.TABLE, name));
     }
-    return relations.get(name);
+    return tables.get(name);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
index 90b8026..72816a3 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
@@ -23,23 +23,16 @@ import org.apache.samza.storage.kv.KeyValueStore;
 
 
 /**
- * This class defines the general interface of <code>Relation</code>, which is defined as a map of <code>Tuple</code>.
+ * This class defines the general interface of {@code Relation}, which is defined as a map of {@link org.apache.samza.sql.api.data.Tuple}.
  *
- * <p>The interface is defined as an extension to <code>KeyValueStore&lt;Object, Tuple&gt;</code>.
+ * <p>The interface is defined as an extension to {@link org.apache.samza.storage.kv.KeyValueStore}.
  *
  */
 
-public interface Relation extends KeyValueStore<Object, Tuple> {
+public interface Relation<K> extends KeyValueStore<K, Tuple> {
 
   /**
-   * Get the primary key field name for this table
-   *
-   * @return The name of the primary key field
-   */
-  String getPrimaryKey();
-
-  /**
-   * Get the name of the relation created by CREATE TABLE
+   * Get the name of the relation
    *
    * @return The relation name
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
new file mode 100644
index 0000000..931705e
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.List;
+
+
+/**
+ * This interface defines an ordered {@link org.apache.samza.sql.api.data.Relation}, which has an ordered key.
+ *
+ * <p> This is to define a stream created by CREATE STREAM statement
+ *
+ * @param <K> The ordered key for the {@code Stream} class
+ */
+public interface Stream<K extends Comparable<?>> extends Relation<K> {
+  /**
+   * Get the list of field names used as the order keys for this stream
+   *
+   * @return The list of field names used to construct the order key for the stream
+   */
+  List<String> getOrderFields();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
new file mode 100644
index 0000000..7b4d984
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+/**
+ * This interface defines a non-ordered {@link org.apache.samza.sql.api.data.Relation}, which has a unique primary key
+ *
+ * <p> This is to define a table created by CREATE TABLE statement
+ *
+ * @param <K> The primary key for the {@code Table} class
+ */
+public interface Table<K> extends Relation<K> {
+
+  /**
+   * Get the primary key field name for this table
+   *
+   * @return The name of the primary key field
+   */
+  String getPrimaryKeyName();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index bc8efcf..bea922b 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.sql.api.data;
 
+import org.apache.samza.system.sql.Offset;
+
+
 /**
  * This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>.
  *
@@ -53,6 +56,20 @@ public interface Tuple {
    *
    * @return The stream name which this tuple belongs to
    */
-  EntityName getStreamName();
+  EntityName getEntityName();
+
+  /**
+   * Get the message creation timestamp of the tuple.
+   *
+   * @return The tuple's creation timestamp in nano seconds.
+   */
+  long getCreateTimeNano();
+
+  /**
+   * Get the offset of the tuple in the stream. This should be used to uniquely identify a tuple in a stream.
+   *
+   * @return The offset of the tuple in the stream.
+   */
+  Offset getOffset();
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
index 0169f2d..d6f6b57 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
@@ -19,25 +19,55 @@
 
 package org.apache.samza.sql.api.operators;
 
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.WindowableTask;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
 
 
-/**
- * This class defines the common interface for operator classes, no matter what input data are.
- *
- * <p> It extends the <code>InitableTask</code> and <code>WindowableTask</code> to reuse the interface methods
- * <code>init</code> and <code>window</code> for initialization and timeout operations
- *
- */
-public interface Operator extends InitableTask, WindowableTask {
+public interface Operator {
+  /**
+   * Method to initialize the operator
+   *
+   * @param config The configuration object
+   * @param context The task context
+   * @throws Exception Throws Exception if failed to initialize the operator
+   */
+  void init(Config config, TaskContext context) throws Exception;
+
+  /**
+   * Method to perform a relational logic on the input relation
+   *
+   * <p> The actual implementation of relational logic is performed by the implementation of this method.
+   *
+   * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
+   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception Throws exception if failed
+   */
+  void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+  /**
+   * Method to process on an input tuple.
+   *
+   * @param tuple The input tuple, which has the incoming message from a stream
+   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception Throws exception if failed
+   */
+  void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
 
   /**
-   * Method to the specification of this <code>Operator</code>
+   * Method to refresh the result when a timer expires
    *
-   * @return The <code>OperatorSpec</code> object that defines the configuration/parameters of the operator
+   * @param timeNano The current system time in nano second
+   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception Throws exception if failed
    */
-  OperatorSpec getSpec();
+  void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
new file mode 100644
index 0000000..fb2aa89
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Defines the callback functions to allow customized functions to be invoked before process and before sending the result
+ */
+public interface OperatorCallback {
+  /**
+   * Method to be invoked before the operator actually process the input tuple
+   *
+   * @param tuple The incoming tuple
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+   * @return The tuple to be processed; return {@code null} if there is nothing to be processed
+   */
+  Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
+
+  /**
+   * Method to be invoked before the operator actually process the input relation
+   *
+   * @param rel The input relation
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+   * @return The relation to be processed; return {@code null} if there is nothing to be processed
+   */
+  Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
+
+  /**
+   * Method to be invoked before the operator's output tuple is sent
+   *
+   * @param tuple The output tuple
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+   * @return The tuple to be sent; return {@code null} if there is nothing to be sent
+   */
+  Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
+
+  /**
+   * Method to be invoked before the operator's output relation is sent
+   *
+   * @param rel The output relation
+   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+   * @return The relation to be sent; return {@code null} if there is nothing to be sent
+   */
+  Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
new file mode 100644
index 0000000..0759638
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+
+
+/**
+ * This interface class defines interface methods to connect {@link org.apache.samza.sql.api.operators.SimpleOperator}s together into a composite operator.
+ *
+ * <p>The {@code OperatorRouter} allows the user to attach operators to a {@link org.apache.samza.sql.api.data.Table} or
+ * a {@link org.apache.samza.sql.api.data.Stream} entity, if the corresponding table/stream is included as inputs to the operator.
+ * Each operator then executes its own logic and determines which table/stream to emit the output to. Through the {@code OperatorRouter},
+ * the next operators attached to the corresponding output entities (i.e. table/streams) can then be invoked to continue the
+ * stream process task.
+ */
+public interface OperatorRouter extends Operator {
+
+  /**
+   * This method adds a {@link org.apache.samza.sql.api.operators.SimpleOperator} to the {@code OperatorRouter}.
+   *
+   * @param nextOp The {@link org.apache.samza.sql.api.operators.SimpleOperator} to be added
+   * @throws Exception Throws exception if failed
+   */
+  void addOperator(SimpleOperator nextOp) throws Exception;
+
+  /**
+   * This method gets the list of {@link org.apache.samza.sql.api.operators.SimpleOperator}s attached to an output entity (of any type)
+   *
+   * @param output The identifier of the output entity
+   * @return The list of {@link org.apache.samza.sql.api.operators.SimpleOperator} taking {@code output} as input table/stream
+   */
+  List<SimpleOperator> getNextOperators(EntityName output);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
new file mode 100644
index 0000000..4d670fd
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+
+
+/**
+ * This class defines a generic specification interface class for all {@link org.apache.samza.sql.api.operators.SimpleOperator}s.
+ *
+ * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
+ *
+ * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
+ * in {@link org.apache.samza.sql.api.data.Table} or {@link org.apache.samza.sql.api.data.Stream} name) of input variables , and the list of entity names of the output variables.
+ *
+ */
+public interface OperatorSpec {
+  /**
+   * Interface method that returns the unique ID of the operator in a task
+   *
+   * @return The unique ID of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+   */
+  String getId();
+
+  /**
+   * Access method to the list of entity names of input variables.
+   *
+   * @return A list of entity names of the inputs
+   */
+  List<EntityName> getInputNames();
+
+  /**
+   * Access method to the list of entity name of the output variable
+   *
+   * @return The entity name of the output
+   *
+   */
+  List<EntityName> getOutputNames();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
deleted file mode 100644
index faa0a32..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface <code>RelationOperator</code>.
- *
- * <p>All operators implementing <code>RelationOperator</code> will take a <code>Relation</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All relation algebra operators, such as: join, select, where, group-by, having, limit, order-by, etc.
- * <li>All relation-to-stream operators, which converts a relation to a stream
- * </ul>
- *
- */
-public interface RelationOperator extends Operator {
-
-  /**
-   * Method to perform a relational algebra on a set of relations, or a relation-to-stream function
-   *
-   * <p> The actual implementation of relational logic is performed by the implementation of this method.
-   * The <code>collector</code> object is used by the operator to send their output to
-   *
-   * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
-   * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
-   * @throws Exception Throws exception if failed
-   */
-  void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
new file mode 100644
index 0000000..c49a822
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+
+
+/**
+ * The interface for a {@code SimpleOperator} that implements a simple primitive relational logic operation
+ */
+public interface SimpleOperator extends Operator {
+  /**
+   * Method to get the specification of this {@code SimpleOperator}
+   *
+   * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} object that defines the configuration/parameters of the operator
+   */
+  OperatorSpec getSpec();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
index 67671b9..6f8d93b 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
@@ -19,33 +19,19 @@
 
 package org.apache.samza.sql.api.operators;
 
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
 
 
 /**
- * This class defines the interface of SQL operator factory, which creates the following operators:
- * <ul>
- * <li><code>RelationOperator</code> that takes <code>Relation</code> as input variables
- * <li><code>TupleOperator</code> that takes <code>Tuple</code> as input variables
- * </ul>
- *
+ * This class defines the interface of SQL operator factory, which creates the {@link org.apache.samza.sql.api.operators.SimpleOperator}s:
  */
 public interface SqlOperatorFactory {
 
   /**
-   * Interface method to create/get the <code>RelationOperator</code> object
-   *
-   * @param spec The specification of the <code>RelationOperator</code> object
-   * @return The relation operator object
-   */
-  RelationOperator getRelationOperator(OperatorSpec spec);
-
-  /**
-   * Interface method to create/get the <code>TupleOperator</code> object
+   * Interface method to create/get the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
    *
-   * @param spec The specification of the <code>TupleOperator</code> object
-   * @return The tuple operator object
+   * @param spec The specification of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+   * @return The {@link org.apache.samza.sql.api.operators.SimpleOperator} object
    */
-  TupleOperator getTupleOperator(OperatorSpec spec);
+  SimpleOperator getOperator(OperatorSpec spec);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
deleted file mode 100644
index ac4654e..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface class that processes incoming tuples from input stream(s).
- *
- * <p>All operators implementing <code>TupleOperator</code> will take a <code>Tuple</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All stream-to-relation operators, such as: window operators.
- * <li>All stream-to-stream operators, such as: re-partition, union of two streams
- * </ul>
- *
- */
-public interface TupleOperator extends Operator {
-  /**
-   * Interface method to process on an input tuple.
-   *
-   * @param tuple The input tuple, which has the incoming message from a stream
-   * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
-   * @throws Exception Throws exception if failed
-   */
-  void process(Tuple tuple, SqlMessageCollector collector) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
deleted file mode 100644
index 96385e2..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators.spec;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-/**
- * This class defines a generic specification interface class for all operators.
- *
- * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
- *
- * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
- * in <code>Tuple</code> or <code>Relation</code> name) of input variables , and the list of entity names of the output variables.
- *
- */
-public interface OperatorSpec {
-  /**
-   * Interface method that returns the unique ID of the operator in a task
-   *
-   * @return The unique ID of the <code>Operator</code> object
-   */
-  String getId();
-
-  /**
-   * Access method to the list of entity names of input variables.
-   *
-   * <p>The input entity names are either stream names if the operator is a <code>TupleOperator</code>;
-   * or <code>Relation</code> names if the operator is a <code>RelationOperator</code>
-   *
-   * @return A list of entity names of the inputs
-   */
-  List<EntityName> getInputNames();
-
-  /**
-   * Access method to the list of entity name of the output variable
-   *
-   * <p>The output entity name is either a stream name if the operator generates tuples as an output stream;
-   * or <code>Relation</code> names if the operator generates a <code>Relation</code> as output.
-   *
-   * @return The entity name of the output
-   *
-   */
-  List<EntityName> getOutputNames();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
deleted file mode 100644
index 2455a62..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.router;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-
-
-/**
- * This interface class defines interface methods to connect operators together.
- *
- * <p>The <code>OperatorRouter</code> allows the user to attach operators to a relation or a stream entity,
- * if the corresponding relation/stream is included as inputs to the operator. Each operator then executes its own logic
- * and determines which relation/stream to emit the output to. Through the <code>OperatorRouter</code>, the next
- * operators attached to the corresponding output entities (i.e. relations/streams) can then be invoked to continue the
- * stream process task.
- *
- * <p>The <code>OperatorRouter</code> also allows the user to set the system input entities (i.e. relations/streams)
- * that are fed into the operators by the system outside the <code>OperatorRouter</code>, not generated by some
- * operators in the <code>OperatorRouter</code>.
- *
- * <p>The methods included in this interface class allow a user to
- * <ul>
- * <li>i)   add operators to an <code>EntityName</code>
- * <li>ii)  get the next operators attached to an <code>EntityName</code>
- * <li>iii) add and get the system input <code>EntityName</code>s
- * <li>iv)  iterate through each and every operator connected via <code>OperatorRouter</code>
- * </ul>
- *
- */
-public interface OperatorRouter {
-
-  /**
-   * This method adds a <code>TupleOperator</code> as one of the input operators.
-   *
-   * @param stream The output stream entity name
-   * @param nextOp The <code>TupleOperator</code> that takes the tuples in the <code>stream</code> as an input.
-   * @throws Exception Throws exception if failed
-   */
-  void addTupleOperator(EntityName stream, TupleOperator nextOp) throws Exception;
-
-  /**
-   * This method adds a <code>RelationOperator</code> as one of the input operators
-
-   * @param relation The input relation entity name
-   * @param nextOp The <code>RelationOperator</code> that takes the <code>relation</code> as an input
-   * @throws Exception Throws exception if failed
-   */
-  void addRelationOperator(EntityName relation, RelationOperator nextOp) throws Exception;
-
-  /**
-   * This method gets the list of <code>RelationOperator</code>s attached to the <code>relation</code>
-   *
-   * @param relation The identifier of the relation entity
-   * @return The list of <code>RelationOperator</code> taking <code>relation</code> as an input variable
-   */
-  List<RelationOperator> getRelationOperators(EntityName relation);
-
-  /**
-   * This method gets the list of <code>TupleOperator</code>s attached to the <code>stream</code>
-   *
-   * @param stream The identifier of the stream entity
-   * @return The list of <code>TupleOperator</code> taking <code>stream</code> as an input variable
-   */
-  List<TupleOperator> getTupleOperators(EntityName stream);
-
-  /**
-   * This method gets the list of <code>Operator</code>s attached to an output entity (of any type)
-   *
-   * @param output The identifier of the output entity
-   * @return The list of <code>Operator</code> taking <code>output</code> as input variables
-   */
-  List<Operator> getNextOperators(EntityName output);
-
-  /**
-   * This method provides an iterator to go through all operators connected via <code>OperatorRouter</code>
-   *
-   * @return An <code>Iterator</code> for all operators connected via <code>OperatorRouter</code>
-   */
-  Iterator<Operator> iterator();
-
-  /**
-   * This method checks to see whether there is any <code>Operator</code> attached to the entity <code>output</code>
-   *
-   * @param output The output entity name
-   * @return True if there is some operator attached to the <code>output</code>; false otherwise
-   */
-  boolean hasNextOperators(EntityName output);
-
-  /**
-   * This method adds an entity as the system input
-   *
-   * @param input The entity name for the system input
-   */
-  void addSystemInput(EntityName input);
-
-  /**
-   * This method returns the list of entities as system inputs
-   *
-   * @return The list of <code>EntityName</code>s as system inputs
-   */
-  List<EntityName> getSystemInputs();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index f868e5c..72a59f2 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -22,10 +22,12 @@ import org.apache.samza.sql.api.data.Data;
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.sql.LongOffset;
+import org.apache.samza.system.sql.Offset;
 
 
 /**
- * This class implements a <code>Tuple</code> class that encapsulates <code>IncomingMessageEnvelope</code> from the system
+ * This class implements a {@link org.apache.samza.sql.api.data.Tuple} that encapsulates an {@link org.apache.samza.system.IncomingMessageEnvelope} from the system
  *
  */
 public class IncomingMessageTuple implements Tuple {
@@ -40,7 +42,12 @@ public class IncomingMessageTuple implements Tuple {
   private final EntityName strmEntity;
 
   /**
-   * Ctor to create a <code>IncomingMessageTuple</code> from <code>IncomingMessageEnvelope</code>
+   * The receive time of this incoming message
+   */
+  private final long recvTimeNano;
+
+  /**
+   * Ctor to create a {@code IncomingMessageTuple} from {@link org.apache.samza.system.IncomingMessageEnvelope}
    *
    * @param imsg The incoming system message
    */
@@ -49,9 +56,9 @@ public class IncomingMessageTuple implements Tuple {
     this.strmEntity =
         EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg
             .getSystemStreamPartition().getStream()));
+    this.recvTimeNano = System.nanoTime();
   }
 
-  // TODO: the return type should be changed to the generic data type
   @Override
   public Data getMessage() {
     return (Data) this.imsg.getMessage();
@@ -68,8 +75,20 @@ public class IncomingMessageTuple implements Tuple {
   }
 
   @Override
-  public EntityName getStreamName() {
+  public EntityName getEntityName() {
     return this.strmEntity;
   }
 
+  @Override
+  public long getCreateTimeNano() {
+    // TODO: this is wrong and just to keep as an placeholder. It should be replaced by the message publish time when the publish timestamp is available in the message metadata
+    return this.recvTimeNano;
+  }
+
+  @Override
+  public Offset getOffset() {
+    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+    // assuming incoming message carries long value as offset (i.e. Kafka case)
+    return new LongOffset(this.imsg.getOffset());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
new file mode 100644
index 0000000..c3d2266
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public final class NoopOperatorCallback implements OperatorCallback {
+
+  @Override
+  public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+    return tuple;
+  }
+
+  @Override
+  public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+    return rel;
+  }
+
+  @Override
+  public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+    return tuple;
+  }
+
+  @Override
+  public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+    return rel;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
deleted file mode 100644
index c634159..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
- *
- */
-public abstract class SimpleOperator implements Operator {
-  /**
-   * The specification of this operator
-   */
-  private final OperatorSpec spec;
-
-  /**
-   * Ctor of <code>SimpleOperator</code> class
-   *
-   * @param spec The specification of this operator
-   */
-  public SimpleOperator(OperatorSpec spec) {
-    this.spec = spec;
-  }
-
-  @Override
-  public OperatorSpec getSpec() {
-    return this.spec;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
index 916b166..cbc84d0 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
@@ -19,16 +19,13 @@
 
 package org.apache.samza.sql.operators.factory;
 
-import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+import org.apache.samza.sql.api.operators.SimpleOperator;
 import org.apache.samza.sql.api.operators.SqlOperatorFactory;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.join.StreamStreamJoin;
+import org.apache.samza.sql.operators.join.StreamStreamJoinSpec;
 import org.apache.samza.sql.operators.partition.PartitionOp;
 import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
 import org.apache.samza.sql.operators.window.BoundedTimeWindow;
 import org.apache.samza.sql.operators.window.WindowSpec;
 
@@ -41,23 +38,14 @@ import org.apache.samza.sql.operators.window.WindowSpec;
 public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
 
   @Override
-  public RelationOperator getRelationOperator(OperatorSpec spec) {
-    if (spec instanceof JoinSpec) {
-      return new Join((JoinSpec) spec);
-    } else if (spec instanceof InsertStreamSpec) {
-      return new InsertStream((InsertStreamSpec) spec);
-    }
-    throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
-  }
-
-  @Override
-  public TupleOperator getTupleOperator(OperatorSpec spec) {
-    if (spec instanceof WindowSpec) {
-      return new BoundedTimeWindow((WindowSpec) spec);
-    } else if (spec instanceof PartitionSpec) {
+  public SimpleOperator getOperator(OperatorSpec spec) {
+    if (spec instanceof PartitionSpec) {
       return new PartitionOp((PartitionSpec) spec);
+    } else if (spec instanceof StreamStreamJoinSpec) {
+      return new StreamStreamJoin((StreamStreamJoinSpec) spec);
+    } else if (spec instanceof WindowSpec) {
+      return new BoundedTimeWindow((WindowSpec) spec);
     }
-    throw new UnsupportedOperationException("Unsupported operator specified" + spec.getClass().getCanonicalName());
+    throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
new file mode 100644
index 0000000..e66451f
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SimpleMessageCollector;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
+ * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator}
+ *
+ */
+public abstract class SimpleOperatorImpl implements SimpleOperator {
+  /**
+   * The specification of this operator
+   */
+  private final OperatorSpec spec;
+
+  /**
+   * The callback function
+   */
+  private final OperatorCallback callback;
+
+  /**
+   * Ctor of {@code SimpleOperatorImpl} class
+   *
+   * @param spec The specification of this operator
+   */
+  public SimpleOperatorImpl(OperatorSpec spec) {
+    this(spec, new NoopOperatorCallback());
+  }
+
+  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) {
+    this.spec = spec;
+    this.callback = callback;
+  }
+
+  @Override
+  public OperatorSpec getSpec() {
+    return this.spec;
+  }
+
+  /**
+   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)}
+   * and real processing of the input relation is fixed.
+   */
+  @Override
+  final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator);
+    if (rel == null) {
+      return;
+    }
+    this.realProcess(rel, getCollector(collector, coordinator), coordinator);
+  }
+
+  /**
+   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)}
+   * and real processing of the input tuple is fixed.
+   */
+  @Override
+  final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator);
+    if (ituple == null) {
+      return;
+    }
+    this.realProcess(ituple, getCollector(collector, coordinator), coordinator);
+  }
+
+  /**
+   * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher
+   */
+  @Override
+  final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator);
+  }
+
+  private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) {
+    if (!(collector instanceof SimpleMessageCollector)) {
+      return new SimpleMessageCollector(collector, coordinator, this.callback);
+    } else {
+      ((SimpleMessageCollector) collector).switchOperatorCallback(this.callback);
+      return (SimpleMessageCollector) collector;
+    }
+  }
+
+  /**
+   * Method to be overriden by each specific implementation class of operator to handle timeout event
+   *
+   * @param timeNano The time in nanosecond when the timeout event occurred
+   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception Throws exception if failed to refresh the results
+   */
+  protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+  /**
+   * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation}
+   *
+   * @param rel The input relation
+   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception
+   */
+  protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+  protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
index 93d4ebb..56753b6 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
@@ -22,12 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.api.operators.OperatorSpec;
 
 
 /**
  * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
- *
+ * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
  */
 public abstract class SimpleOperatorSpec implements OperatorSpec {
   /**
@@ -46,9 +46,9 @@ public abstract class SimpleOperatorSpec implements OperatorSpec {
   private final List<EntityName> outputs = new ArrayList<EntityName>();
 
   /**
-   * Ctor of the <code>SimpleOperatorSpec</code> for simple <code>Operator</code>s w/ one input and one output
+   * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output
    *
-   * @param id Unique identifier of the <code>Operator</code> object
+   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
    * @param input The only input entity
    * @param output The only output entity
    */
@@ -59,9 +59,9 @@ public abstract class SimpleOperatorSpec implements OperatorSpec {
   }
 
   /**
-   * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and n outputs
+   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs
    *
-   * @param id Unique identifier of the <code>Operator</code> object
+   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
    * @param inputs The list of input entities
    * @param output The list of output entities
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
new file mode 100644
index 0000000..e570897
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.OperatorRouter;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.RouterMessageCollector;
+
+
+/**
+ * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter}
+ *
+ */
+public final class SimpleRouter implements OperatorRouter {
+  /**
+   * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter}
+   */
+  private List<SimpleOperator> operators = new ArrayList<SimpleOperator>();
+
+  @SuppressWarnings("rawtypes")
+  /**
+   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it
+   */
+  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
+
+  /**
+   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter}
+   */
+  private Set<EntityName> inputEntities = new HashSet<EntityName>();
+
+  /**
+   * Set of entities that are not input entities to this {@code SimpleRouter}
+   */
+  private Set<EntityName> outputEntities = new HashSet<EntityName>();
+
+  @SuppressWarnings("unchecked")
+  private void addOperator(EntityName input, SimpleOperator nextOp) {
+    if (nextOps.get(input) == null) {
+      nextOps.put(input, new ArrayList<Operator>());
+    }
+    nextOps.get(input).add(nextOp);
+    operators.add(nextOp);
+    // get the operator spec
+    for (EntityName output : nextOp.getSpec().getOutputNames()) {
+      if (inputEntities.contains(output)) {
+        inputEntities.remove(output);
+      }
+      outputEntities.add(output);
+    }
+    if (!outputEntities.contains(input)) {
+      inputEntities.add(input);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<SimpleOperator> getNextOperators(EntityName entity) {
+    return nextOps.get(entity);
+  }
+
+  @Override
+  public void addOperator(SimpleOperator nextOp) {
+    List<EntityName> inputs = nextOp.getSpec().getInputNames();
+    for (EntityName input : inputs) {
+      addOperator(input, nextOp);
+    }
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    for (SimpleOperator op : this.operators) {
+      op.init(config, context);
+    }
+  }
+
+  @Override
+  public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
+      iter.next().process(ituple, opCollector, coordinator);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) {
+      iter.next().process(deltaRelation, opCollector, coordinator);
+    }
+  }
+
+  @Override
+  public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (EntityName entity : inputEntities) {
+      for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) {
+        iter.next().refresh(nanoSec, opCollector, coordinator);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
new file mode 100644
index 0000000..2854aeb
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.join;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Stream;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.sql.window.storage.OrderedStoreKey;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SimpleMessageCollector;
+
+
+/**
+ * This class implements a simple stream-to-stream join
+ */
+public class StreamStreamJoin extends SimpleOperatorImpl {
+  private final StreamStreamJoinSpec spec;
+
+  private Map<EntityName, BoundedTimeWindow> inputWindows = new HashMap<EntityName, BoundedTimeWindow>();
+
+  public StreamStreamJoin(StreamStreamJoinSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec.
+  public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys) {
+    this(null);
+  }
+
+  //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec.
+  public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys,
+      OperatorCallback callback) {
+    super(null, callback);
+    this.spec = null;
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // TODO Auto-generated method stub
+    // initialize the inputWindows map
+
+  }
+
+  private void join(Tuple tuple, Map<EntityName, Stream> joinSets) {
+    // TODO Auto-generated method stub
+    // Do M-way joins if necessary, it should be ordered based on the orders of the input relations in inputs
+    // NOTE: inner joins may be optimized by re-order the input relations by joining inputs w/ less join sets first. We will consider it later.
+
+  }
+
+  private Map<EntityName, Stream> findJoinSets(Tuple tuple) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private KeyValueIterator<OrderedStoreKey, Tuple> getJoinSet(Tuple tuple, EntityName strmName) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private List<Entry<String, Object>> getEqualFields(Tuple tuple, EntityName strmName) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    // TODO Auto-generated method stub
+    Map<EntityName, Stream> joinSets = findJoinSets(tuple);
+    join(tuple, joinSets);
+  }
+
+  @Override
+  public void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
new file mode 100644
index 0000000..cc0aca0
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.join;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class defines the specification of a {@link org.apache.samza.sql.operators.join.StreamStreamJoin} operator
+ */
+public class StreamStreamJoinSpec extends SimpleOperatorSpec {
+
+  public StreamStreamJoinSpec(String id, List<EntityName> inputs, EntityName output, List<String> joinKeys) {
+    super(id, inputs, output);
+    // TODO Auto-generated constructor stub
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 986d688..b93d789 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -20,31 +20,33 @@
 package org.apache.samza.sql.operators.partition;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
+import org.apache.samza.task.sql.SimpleMessageCollector;
 
 
 /**
  * This is an example build-in operator that performs a simple stream re-partition operation.
  *
  */
-public final class PartitionOp extends SimpleOperator implements TupleOperator {
+public class PartitionOp extends SimpleOperatorImpl {
 
   /**
-   * The specification of this <code>PartitionOp</code>
+   * The specification of this {@code PartitionOp}
    *
    */
   private final PartitionSpec spec;
 
   /**
-   * Ctor that takes the <code>PartitionSpec</code> object as input.
+   * Ctor that takes the {@link org.apache.samza.sql.operators.partition.PartitionSpec} object as input.
    *
    * @param spec The <code>PartitionSpec</code> object
    */
@@ -64,7 +66,23 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
    * @param parNum The number of partitions used for the output stream
    */
   public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) {
-    super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
+    this(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
+  }
+
+  /**
+   * A simplified constructor that allow users to randomly create <code>PartitionOp</code>
+   *
+   * @param id The identifier of this operator
+   * @param input The input stream name of this operator
+   * @param system The output system name of this operator
+   * @param output The output stream name of this operator
+   * @param parKey The partition key used for the output stream
+   * @param parNum The number of partitions used for the output stream
+   * @param callback The callback functions for operator
+   */
+  public PartitionOp(String id, String input, String system, String output, String parKey, int parNum,
+      OperatorCallback callback) {
+    super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum), callback);
     this.spec = (PartitionSpec) super.getSpec();
   }
 
@@ -75,15 +93,28 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
   }
 
   @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+  protected void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
     // TODO Auto-generated method stub
     // NOOP or flush
   }
 
   @Override
-  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
-        tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
+  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getMessage()
+        .getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getKey().value(), tuple.getMessage().value()));
+  }
+
+  @Override
+  protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    for(KeyValueIterator<?, Tuple> iter = deltaRelation.all(); iter.hasNext(); ) {
+      Entry<?, Tuple> entry = iter.next();
+      collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), entry.getValue().getMessage()
+          .getFieldData(PartitionOp.this.spec.getParKey()).value(), entry.getValue().getKey().value(), entry.getValue()
+          .getMessage().value()));
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
index 29d1784..c47eed9 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
@@ -20,13 +20,13 @@
 package org.apache.samza.sql.operators.partition;
 
 import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.api.operators.OperatorSpec;
 import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
 import org.apache.samza.system.SystemStream;
 
 
 /**
- * This class defines the specification class of <code>PartitionOp</code> operator
+ * This class defines the specification class of {@link org.apache.samza.sql.operators.partition.PartitionOp}
  *
  */
 public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
@@ -47,11 +47,11 @@ public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
   private final SystemStream sysStream;
 
   /**
-   * Ctor to create the <code>PartitionSpec</code>
+   * Ctor to create the {@code PartitionSpec}
    *
-   * @param id The ID of the <code>PartitionOp</code>
+   * @param id The ID of the {@link org.apache.samza.sql.operators.partition.PartitionOp}
    * @param input The input stream name
-   * @param output The output <code>SystemStream</code> object
+   * @param output The output {@link org.apache.samza.system.SystemStream} object
    * @param parKey The name of the partition key
    * @param parNum The number of partitions
    */
@@ -81,7 +81,7 @@ public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
   }
 
   /**
-   * Method to get the output <code>SystemStream</code>
+   * Method to get the output {@link org.apache.samza.system.SystemStream}
    *
    * @return The output system stream object
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
deleted file mode 100644
index a8a6eaf..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.relation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for a join operator between two relations.
- *
- */
-public class Join extends SimpleOperator implements RelationOperator {
-
-  private final JoinSpec spec;
-
-  /**
-   * The input relations
-   *
-   */
-  private List<Relation> inputs = null;
-
-  /**
-   * The output relation
-   */
-  private Relation output = null;
-
-  /**
-   * Ctor that creates <code>Join</code> operator based on the specification.
-   *
-   * @param spec The <code>JoinSpec</code> object that specifies the join operator
-   */
-  public Join(JoinSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * An alternative ctor that allows users to create a join operator randomly.
-   *
-   * @param id The identifier of the join operator
-   * @param joinIns The list of input relation names of the join
-   * @param joinOut The output relation name of the join
-   * @param joinKeys The list of keys used in the join. Each entry in the <code>joinKeys</code> is the key name used in one of the input relations.
-   *     The order of the <code>joinKeys</code> MUST be the same as their corresponding relation names in <code>joinIns</code>
-   */
-  @SuppressWarnings("serial")
-  public Join(final String id, final List<String> joinIns, final String joinOut, final List<String> joinKeys) {
-    super(new JoinSpec(id, new ArrayList<EntityName>() {
-      {
-        for (String name : joinIns) {
-          add(EntityName.getRelationName(name));
-        }
-      }
-    }, EntityName.getRelationName(joinOut), joinKeys));
-    this.spec = (JoinSpec) this.getSpec();
-  }
-
-  private boolean hasPendingChanges() {
-    return getPendingChanges() != null;
-  }
-
-  private Relation getPendingChanges() {
-    // TODO Auto-generated method stub
-    // return any pending changes that have not been processed yet
-    return null;
-  }
-
-  private Relation getOutputChanges() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private boolean hasOutputChanges() {
-    // TODO Auto-generated method stub
-    return getOutputChanges() != null;
-  }
-
-  private void join(Relation deltaRelation) {
-    // TODO Auto-generated method stub
-    // implement the join logic
-    // 1. calculate the delta changes in <code>output</code>
-    // 2. check output condition to see whether the current input should trigger an output
-    // 3. set the output changes and pending changes
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    for (EntityName relation : this.spec.getInputNames()) {
-      inputs.add((Relation) context.getStore(relation.toString()));
-    }
-    this.output = (Relation) context.getStore(this.spec.getOutputName().toString());
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
-    if (hasPendingChanges()) {
-      sqlCollector.send(getPendingChanges());
-    }
-    sqlCollector.timeout(this.spec.getOutputNames());
-  }
-
-  @Override
-  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
-    // calculate join based on the input <code>deltaRelation</code>
-    join(deltaRelation);
-    if (hasOutputChanges()) {
-      collector.send(getOutputChanges());
-    }
-  }
-}