You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/06/02 07:52:50 UTC

[2/2] samza git commit: Revert accidental check-in of "TopologyBuilder RB 34500"

Revert accidental check-in of "TopologyBuilder RB 34500"

This reverts commit 45b854772cf36cc69e8d8cda7a51bce1be5fe576.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebecbf2d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebecbf2d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebecbf2d

Branch: refs/heads/samza-sql
Commit: ebecbf2de48ec183ce836941d29d798b6297db61
Parents: 45b8547
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Mon Jun 1 22:51:33 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Mon Jun 1 22:51:33 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/api/data/EntityName.java   |  41 +--
 .../org/apache/samza/sql/api/data/Table.java    |   7 +-
 .../samza/sql/api/operators/Operator.java       |   4 -
 .../sql/api/operators/OperatorCallback.java     |   1 +
 .../samza/sql/api/operators/OperatorRouter.java |   8 -
 .../samza/sql/api/operators/OperatorSink.java   |  30 --
 .../samza/sql/api/operators/OperatorSource.java |  30 --
 .../samza/sql/api/operators/SimpleOperator.java |   3 +-
 .../samza/sql/data/IncomingMessageTuple.java    |   1 +
 .../sql/operators/NoopOperatorCallback.java     |  53 ----
 .../samza/sql/operators/OperatorTopology.java   |  53 ----
 .../samza/sql/operators/SimpleOperatorImpl.java | 147 ----------
 .../samza/sql/operators/SimpleOperatorSpec.java | 106 -------
 .../samza/sql/operators/SimpleRouter.java       | 141 ---------
 .../operators/factory/NoopOperatorCallback.java |  50 ++++
 .../operators/factory/SimpleOperatorImpl.java   | 136 +++++++++
 .../operators/factory/SimpleOperatorSpec.java   | 106 +++++++
 .../sql/operators/factory/SimpleRouter.java     | 136 +++++++++
 .../sql/operators/factory/TopologyBuilder.java  | 284 -------------------
 .../sql/operators/join/StreamStreamJoin.java    |   3 +-
 .../operators/join/StreamStreamJoinSpec.java    |  15 +-
 .../sql/operators/partition/PartitionOp.java    |   3 +-
 .../sql/operators/partition/PartitionSpec.java  |   2 +-
 .../sql/operators/window/BoundedTimeWindow.java |   4 +-
 .../samza/sql/operators/window/WindowSpec.java  |   7 +-
 .../samza/task/sql/SimpleMessageCollector.java  |  37 +--
 .../task/sql/RandomWindowOperatorTask.java      |  11 +-
 .../apache/samza/task/sql/StreamSqlTask.java    |  26 +-
 .../samza/task/sql/UserCallbacksSqlTask.java    |  66 +++--
 29 files changed, 520 insertions(+), 991 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
index df1b11b..80ba455 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
@@ -49,8 +49,6 @@ public class EntityName {
    */
   private final String name;
 
-  private final boolean isSystemEntity;
-
   /**
    * Static map of already allocated table names
    */
@@ -61,19 +59,15 @@ public class EntityName {
    */
   private static Map<String, EntityName> streams = new HashMap<String, EntityName>();
 
-  private static final String ANONYMOUS = "anonymous";
-
   /**
    * Private ctor to create entity names
    *
    * @param type Type of the entity name
    * @param name Formatted name of the entity
-   * @param isSystemEntity whether the entity is a system input/output
    */
-  private EntityName(EntityType type, String name, boolean isSystemEntity) {
+  private EntityName(EntityType type, String name) {
     this.type = type;
     this.name = name;
-    this.isSystemEntity = isSystemEntity;
   }
 
   @Override
@@ -108,10 +102,6 @@ public class EntityName {
     return this.type.equals(EntityType.STREAM);
   }
 
-  public boolean isSystemEntity() {
-    return this.isSystemEntity;
-  }
-
   /**
    * Get the formatted entity name
    *
@@ -121,24 +111,15 @@ public class EntityName {
     return this.name;
   }
 
-  public static EntityName getTableName(String name) {
-    return getTableName(name, false);
-  }
-
-  public static EntityName getStreamName(String name) {
-    return getStreamName(name, false);
-  }
-
   /**
    * Static method to get the instance of {@code EntityName} with type {@code EntityType.TABLE}
    *
    * @param name The formatted entity name of the relation
-   * @param isSystem The boolean flag indicating whether this is a system input/output
    * @return A <code>EntityName</code> for a relation
    */
-  public static EntityName getTableName(String name, boolean isSystem) {
+  public static EntityName getTableName(String name) {
     if (tables.get(name) == null) {
-      tables.put(name, new EntityName(EntityType.TABLE, name, isSystem));
+      tables.put(name, new EntityName(EntityType.TABLE, name));
     }
     return tables.get(name);
   }
@@ -147,25 +128,13 @@ public class EntityName {
    * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.STREAM</code>
    *
    * @param name The formatted entity name of the stream
-   * @param isSystem The boolean flag indicating whether this is a system input/output
    * @return A <code>EntityName</code> for a stream
    */
-  public static EntityName getStreamName(String name, boolean isSystem) {
+  public static EntityName getStreamName(String name) {
     if (streams.get(name) == null) {
-      streams.put(name, new EntityName(EntityType.STREAM, name, isSystem));
+      streams.put(name, new EntityName(EntityType.STREAM, name));
     }
     return streams.get(name);
   }
 
-  public static EntityName getAnonymousStream() {
-    return getStreamName(ANONYMOUS);
-  }
-
-  public static EntityName getAnonymousTable() {
-    return getTableName(ANONYMOUS);
-  }
-
-  public boolean isAnonymous() {
-    return this.name.equals(ANONYMOUS);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
index b4dce07..7b4d984 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
@@ -19,9 +19,6 @@
 
 package org.apache.samza.sql.api.data;
 
-import java.util.List;
-
-
 /**
  * This interface defines a non-ordered {@link org.apache.samza.sql.api.data.Relation}, which has a unique primary key
  *
@@ -34,8 +31,8 @@ public interface Table<K> extends Relation<K> {
   /**
    * Get the primary key field name for this table
    *
-   * @return The names of the primary key fields
+   * @return The name of the primary key field
    */
-  List<String> getPrimaryKeyNames();
+  String getPrimaryKeyName();
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
index 9c6eaa5..d6f6b57 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
@@ -27,11 +27,7 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
-/**
- * This class defines the common interface for operator classes.
- */
 public interface Operator {
-
   /**
    * Method to initialize the operator
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
index 5a77d95..fb2aa89 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
@@ -23,6 +23,7 @@ import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
+
 /**
  * Defines the callback functions to allow customized functions to be invoked before process and before sending the result
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
index 432e6b3..0759638 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.api.operators;
 
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.samza.sql.api.data.EntityName;
@@ -52,11 +51,4 @@ public interface OperatorRouter extends Operator {
    */
   List<SimpleOperator> getNextOperators(EntityName output);
 
-  /**
-   * This method provides an iterator to go through all operators connected via {@code OperatorRouter}
-   *
-   * @return An {@link java.util.Iterator} for all operators connected via {@code OperatorRouter}
-   */
-  Iterator<SimpleOperator> iterator();
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
deleted file mode 100644
index e2c748c..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.api.operators;
-
-import java.util.Iterator;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-public interface OperatorSink {
-  Iterator<SimpleOperator> opIterator();
-
-  EntityName getName();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
deleted file mode 100644
index 860c1aa..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.api.operators;
-
-import java.util.Iterator;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-public interface OperatorSource {
-  Iterator<SimpleOperator> opIterator();
-
-  EntityName getName();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
index 60ace9c..c49a822 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.sql.api.operators;
 
+
+
 /**
  * The interface for a {@code SimpleOperator} that implements a simple primitive relational logic operation
  */
@@ -29,5 +31,4 @@ public interface SimpleOperator extends Operator {
    * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} object that defines the configuration/parameters of the operator
    */
   OperatorSpec getSpec();
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index af040f0..72a59f2 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -81,6 +81,7 @@ public class IncomingMessageTuple implements Tuple {
 
   @Override
   public long getCreateTimeNano() {
+    // TODO: this is wrong and just to keep as an placeholder. It should be replaced by the message publish time when the publish timestamp is available in the message metadata
     return this.recvTimeNano;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
deleted file mode 100644
index e951737..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * This is a default NOOP operator callback object that does nothing before and after the process method
- */
-public final class NoopOperatorCallback implements OperatorCallback {
-
-  @Override
-  public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
-    return tuple;
-  }
-
-  @Override
-  public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
-    return rel;
-  }
-
-  @Override
-  public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
-    return tuple;
-  }
-
-  @Override
-  public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
-    return rel;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
deleted file mode 100644
index 8b70092..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators;
-
-import java.util.Iterator;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorSink;
-import org.apache.samza.sql.api.operators.OperatorSource;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-
-
-/**
- * This class implements a partially completed {@link org.apache.samza.sql.operators.factory.TopologyBuilder} that signifies a partially completed
- * topology that the current operator has unbounded input stream that can be attached to other operators' output
- */
-public class OperatorTopology implements OperatorSource, OperatorSink {
-
-  private final EntityName name;
-  private final SimpleRouter router;
-
-  public OperatorTopology(EntityName name, SimpleRouter router) {
-    this.name = name;
-    this.router = router;
-  }
-
-  @Override
-  public Iterator<SimpleOperator> opIterator() {
-    return this.router.iterator();
-  }
-
-  @Override
-  public EntityName getName() {
-    return this.name;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
deleted file mode 100644
index 423880b..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SimpleMessageCollector;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
- * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator}
- */
-public abstract class SimpleOperatorImpl implements SimpleOperator {
-  /**
-   * The specification of this operator
-   */
-  private final OperatorSpec spec;
-
-  /**
-<<<<<<< HEAD
-   * The callback object
-=======
-   * The callback function
->>>>>>> SAMZA-552: use OperatorCallback to allow implementation of callbacks w/o inheriting and creating many sub-classes from operators
-   */
-  private final OperatorCallback callback;
-
-  /**
-   * Ctor of {@code SimpleOperatorImpl} class
-   *
-   * @param spec The specification of this operator
-   */
-  public SimpleOperatorImpl(OperatorSpec spec) {
-    this(spec, new NoopOperatorCallback());
-  }
-
-  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) {
-    this.spec = spec;
-    this.callback = callback;
-  }
-
-  @Override
-  public OperatorSpec getSpec() {
-    return this.spec;
-  }
-
-  /**
-   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)}
-   * and real processing of the input relation is fixed.
-   */
-  @Override
-  final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator);
-    if (rel == null) {
-      return;
-    }
-    this.realProcess(rel, getCollector(collector, coordinator), coordinator);
-  }
-
-  /**
-   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)}
-   * and real processing of the input tuple is fixed.
-   */
-  @Override
-  final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator);
-    if (ituple == null) {
-      return;
-    }
-    this.realProcess(ituple, getCollector(collector, coordinator), coordinator);
-  }
-
-  /**
-   * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher
-   */
-  @Override
-  final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator);
-  }
-
-  private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) {
-    if (!(collector instanceof SimpleMessageCollector)) {
-      return new SimpleMessageCollector(collector, coordinator, this.callback);
-    } else {
-      ((SimpleMessageCollector) collector).switchCallback(this.callback);
-      return (SimpleMessageCollector) collector;
-    }
-  }
-
-  /**
-   * Method to be overriden by each specific implementation class of operator to handle timeout event
-   *
-   * @param timeNano The time in nanosecond when the timeout event occurred
-   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed to refresh the results
-   */
-  protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-  /**
-   * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation}
-   *
-   * @param rel The input relation
-   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed to process
-   */
-  protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-  /**
-   * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Tuple}
-   *
-   * @param ituple The input tuple
-   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed to process
-   */
-  protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
deleted file mode 100644
index 691e543..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
- * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
- */
-public abstract class SimpleOperatorSpec implements OperatorSpec {
-  /**
-   * The identifier of the corresponding operator
-   */
-  private final String id;
-
-  /**
-   * The list of input entity names of the corresponding operator
-   */
-  private final List<EntityName> inputs = new ArrayList<EntityName>();
-
-  /**
-   * The list of output entity names of the corresponding operator
-   */
-  private final List<EntityName> outputs = new ArrayList<EntityName>();
-
-  /**
-   * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output
-   *
-   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   * @param input The only input entity
-   * @param output The only output entity
-   */
-  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
-    this.id = id;
-    this.inputs.add(input);
-    this.outputs.add(output);
-  }
-
-  /**
-   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs
-   *
-   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   * @param inputs The list of input entities
-   * @param output The list of output entities
-   */
-  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) {
-    this.id = id;
-    this.inputs.addAll(inputs);
-    this.outputs.add(output);
-  }
-
-  @Override
-  public String getId() {
-    return this.id;
-  }
-
-  @Override
-  public List<EntityName> getInputNames() {
-    return this.inputs;
-  }
-
-  @Override
-  public List<EntityName> getOutputNames() {
-    return this.outputs;
-  }
-
-  /**
-   * Method to get the first output entity
-   *
-   * @return The first output entity name
-   */
-  public EntityName getOutputName() {
-    return this.outputs.get(0);
-  }
-
-  /**
-   * Method to get the first input entity
-   *
-   * @return The first input entity name
-   */
-  public EntityName getInputName() {
-    return this.inputs.get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
deleted file mode 100644
index 2d9a1db..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.OperatorRouter;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.RouterMessageCollector;
-
-
-/**
- * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter}
- *
- */
-public final class SimpleRouter implements OperatorRouter {
-  /**
-   * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter}
-   */
-  private List<SimpleOperator> operators = new ArrayList<SimpleOperator>();
-
-  @SuppressWarnings("rawtypes")
-  /**
-   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it
-   */
-  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
-
-  /**
-   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter}
-   */
-  private Set<EntityName> inputEntities = new HashSet<EntityName>();
-
-  /**
-   * Set of entities that are not input entities to this {@code SimpleRouter}
-   */
-  private Set<EntityName> outputEntities = new HashSet<EntityName>();
-
-  @SuppressWarnings("unchecked")
-  private void addOperator(EntityName input, SimpleOperator nextOp) {
-    if (nextOps.get(input) == null) {
-      nextOps.put(input, new ArrayList<Operator>());
-    }
-    nextOps.get(input).add(nextOp);
-    operators.add(nextOp);
-    // get the operator spec
-    for (EntityName output : nextOp.getSpec().getOutputNames()) {
-      if (inputEntities.contains(output)) {
-        inputEntities.remove(output);
-      }
-      outputEntities.add(output);
-    }
-    if (!outputEntities.contains(input)) {
-      inputEntities.add(input);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public List<SimpleOperator> getNextOperators(EntityName entity) {
-    return nextOps.get(entity);
-  }
-
-  @Override
-  public void addOperator(SimpleOperator nextOp) {
-    List<EntityName> inputs = nextOp.getSpec().getInputNames();
-    for (EntityName input : inputs) {
-      addOperator(input, nextOp);
-    }
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    for (SimpleOperator op : this.operators) {
-      op.init(config, context);
-    }
-  }
-
-  @Override
-  public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
-      iter.next().process(ituple, opCollector, coordinator);
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) {
-      iter.next().process(deltaRelation, opCollector, coordinator);
-    }
-  }
-
-  @Override
-  public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (EntityName entity : inputEntities) {
-      for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) {
-        iter.next().refresh(nanoSec, opCollector, coordinator);
-      }
-    }
-  }
-
-  @Override
-  public Iterator<SimpleOperator> iterator() {
-    return this.operators.iterator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
new file mode 100644
index 0000000..c3d2266
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public final class NoopOperatorCallback implements OperatorCallback {
+
+  @Override
+  public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+    return tuple;
+  }
+
+  @Override
+  public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+    return rel;
+  }
+
+  @Override
+  public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+    return tuple;
+  }
+
+  @Override
+  public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+    return rel;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
new file mode 100644
index 0000000..e66451f
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SimpleMessageCollector;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
+ * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator}
+ *
+ */
+public abstract class SimpleOperatorImpl implements SimpleOperator {
+  /**
+   * The specification of this operator
+   */
+  private final OperatorSpec spec;
+
+  /**
+   * The callback function
+   */
+  private final OperatorCallback callback;
+
+  /**
+   * Ctor of {@code SimpleOperatorImpl} class
+   *
+   * @param spec The specification of this operator
+   */
+  public SimpleOperatorImpl(OperatorSpec spec) {
+    this(spec, new NoopOperatorCallback());
+  }
+
+  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) {
+    this.spec = spec;
+    this.callback = callback;
+  }
+
+  @Override
+  public OperatorSpec getSpec() {
+    return this.spec;
+  }
+
+  /**
+   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)}
+   * and real processing of the input relation is fixed.
+   */
+  @Override
+  final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator);
+    if (rel == null) {
+      return;
+    }
+    this.realProcess(rel, getCollector(collector, coordinator), coordinator);
+  }
+
+  /**
+   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)}
+   * and real processing of the input tuple is fixed.
+   */
+  @Override
+  final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator);
+    if (ituple == null) {
+      return;
+    }
+    this.realProcess(ituple, getCollector(collector, coordinator), coordinator);
+  }
+
+  /**
+   * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher
+   */
+  @Override
+  final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator);
+  }
+
+  private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) {
+    if (!(collector instanceof SimpleMessageCollector)) {
+      return new SimpleMessageCollector(collector, coordinator, this.callback);
+    } else {
+      ((SimpleMessageCollector) collector).switchOperatorCallback(this.callback);
+      return (SimpleMessageCollector) collector;
+    }
+  }
+
+  /**
+   * Method to be overriden by each specific implementation class of operator to handle timeout event
+   *
+   * @param timeNano The time in nanosecond when the timeout event occurred
+   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception Throws exception if failed to refresh the results
+   */
+  protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+  /**
+   * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation}
+   *
+   * @param rel The input relation
+   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+   * @throws Exception
+   */
+  protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+  protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+      throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
new file mode 100644
index 0000000..56753b6
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
+ * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
+ */
+public abstract class SimpleOperatorSpec implements OperatorSpec {
+  /**
+   * The identifier of the corresponding operator
+   */
+  private final String id;
+
+  /**
+   * The list of input entity names of the corresponding operator
+   */
+  private final List<EntityName> inputs = new ArrayList<EntityName>();
+
+  /**
+   * The list of output entity names of the corresponding operator
+   */
+  private final List<EntityName> outputs = new ArrayList<EntityName>();
+
+  /**
+   * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output
+   *
+   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+   * @param input The only input entity
+   * @param output The only output entity
+   */
+  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
+    this.id = id;
+    this.inputs.add(input);
+    this.outputs.add(output);
+  }
+
+  /**
+   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs
+   *
+   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+   * @param inputs The list of input entities
+   * @param output The list of output entities
+   */
+  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) {
+    this.id = id;
+    this.inputs.addAll(inputs);
+    this.outputs.add(output);
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public List<EntityName> getInputNames() {
+    return this.inputs;
+  }
+
+  @Override
+  public List<EntityName> getOutputNames() {
+    return this.outputs;
+  }
+
+  /**
+   * Method to get the first output entity
+   *
+   * @return The first output entity name
+   */
+  public EntityName getOutputName() {
+    return this.outputs.get(0);
+  }
+
+  /**
+   * Method to get the first input entity
+   *
+   * @return The first input entity name
+   */
+  public EntityName getInputName() {
+    return this.inputs.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
new file mode 100644
index 0000000..e570897
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.OperatorRouter;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.RouterMessageCollector;
+
+
+/**
+ * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter}
+ *
+ */
+public final class SimpleRouter implements OperatorRouter {
+  /**
+   * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter}
+   */
+  private List<SimpleOperator> operators = new ArrayList<SimpleOperator>();
+
+  @SuppressWarnings("rawtypes")
+  /**
+   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it
+   */
+  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
+
+  /**
+   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter}
+   */
+  private Set<EntityName> inputEntities = new HashSet<EntityName>();
+
+  /**
+   * Set of entities that are not input entities to this {@code SimpleRouter}
+   */
+  private Set<EntityName> outputEntities = new HashSet<EntityName>();
+
+  @SuppressWarnings("unchecked")
+  private void addOperator(EntityName input, SimpleOperator nextOp) {
+    if (nextOps.get(input) == null) {
+      nextOps.put(input, new ArrayList<Operator>());
+    }
+    nextOps.get(input).add(nextOp);
+    operators.add(nextOp);
+    // get the operator spec
+    for (EntityName output : nextOp.getSpec().getOutputNames()) {
+      if (inputEntities.contains(output)) {
+        inputEntities.remove(output);
+      }
+      outputEntities.add(output);
+    }
+    if (!outputEntities.contains(input)) {
+      inputEntities.add(input);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<SimpleOperator> getNextOperators(EntityName entity) {
+    return nextOps.get(entity);
+  }
+
+  @Override
+  public void addOperator(SimpleOperator nextOp) {
+    List<EntityName> inputs = nextOp.getSpec().getInputNames();
+    for (EntityName input : inputs) {
+      addOperator(input, nextOp);
+    }
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    for (SimpleOperator op : this.operators) {
+      op.init(config, context);
+    }
+  }
+
+  @Override
+  public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
+      iter.next().process(ituple, opCollector, coordinator);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) {
+      iter.next().process(deltaRelation, opCollector, coordinator);
+    }
+  }
+
+  @Override
+  public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+    for (EntityName entity : inputEntities) {
+      for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) {
+        iter.next().refresh(nanoSec, opCollector, coordinator);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
deleted file mode 100644
index 62b19fc..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators.factory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorRouter;
-import org.apache.samza.sql.api.operators.OperatorSink;
-import org.apache.samza.sql.api.operators.OperatorSource;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.sql.api.operators.SqlOperatorFactory;
-import org.apache.samza.sql.operators.OperatorTopology;
-import org.apache.samza.sql.operators.SimpleRouter;
-
-
-/**
- * This class implements a builder to allow user to create the operators and connect them in a topology altogether.
- */
-public class TopologyBuilder {
-
-  /**
-   * Internal {@link org.apache.samza.sql.api.operators.OperatorRouter} object to retain the topology being created
-   */
-  private SimpleRouter router;
-
-  /**
-   * The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} object used to create operators connected in the topology
-   */
-  private final SqlOperatorFactory factory;
-
-  /**
-   * The map of unbound inputs, the value is set(input_operators)
-   */
-  private Map<EntityName, Set<OperatorSpec>> unboundInputs = new HashMap<EntityName, Set<OperatorSpec>>();
-
-  /**
-   * The map of unbound outputs, the value is the operator generating the output
-   */
-  private Map<EntityName, OperatorSpec> unboundOutputs = new HashMap<EntityName, OperatorSpec>();
-
-  /**
-   * The set of entities that are intermediate entities between operators
-   */
-  private Set<EntityName> interStreams = new HashSet<EntityName>();
-
-  /**
-   * The current operator that may have unbound input or output
-   */
-  private SimpleOperator currentOp = null;
-
-  /**
-   * Private constructor of {@code TopologyBuilder}
-   *
-   * @param factory The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators
-   */
-  private TopologyBuilder(SqlOperatorFactory factory) {
-    this.router = new SimpleRouter();
-    this.factory = factory;
-  }
-
-  /**
-   * Static method to create this {@code TopologyBuilder} w/ a customized {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
-   *
-   * @param factory The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators
-   * @return The {@code TopologyBuilder} object
-   */
-  public static TopologyBuilder create(SqlOperatorFactory factory) {
-    return new TopologyBuilder(factory);
-  }
-
-  /**
-   * Static method to create this {@code TopologyBuilder}
-   *
-   * @return The {@code TopologyBuilder} object
-   */
-  public static TopologyBuilder create() {
-    return new TopologyBuilder(new SimpleOperatorFactoryImpl());
-  }
-
-  /**
-   * Public method to create the next operator and attach it to the output of the current operator
-   *
-   * @param spec The {@link org.apache.samza.sql.api.operators.OperatorSpec} for the next operator
-   * @return The updated {@code TopologyBuilder} object
-   */
-  public TopologyBuilder operator(OperatorSpec spec) {
-    // check whether it is valid to connect a new operator to the current operator's output
-    SimpleOperator nextOp = this.factory.getOperator(spec);
-    return this.operator(nextOp);
-  }
-
-  /**
-   * Public method to create the next operator and attach it to the output of the current operator
-   *
-   * @param op The {@link org.apache.samza.sql.api.operators.SimpleOperator}
-   * @return The updated {@code TopologyBuilder} object
-   */
-  public TopologyBuilder operator(SimpleOperator op) {
-    // check whether it is valid to connect a new operator to the current operator's output
-    canAddOperator(op);
-    this.addOperator(op);
-    // advance the current operator position
-    this.currentOp = op;
-    return this;
-  }
-
-  /**
-   * Public method to create a stream object that will be the source to other operators
-   *
-   * @return The {@link org.apache.samza.sql.api.operators.OperatorSource} that can be the source to other operators
-   */
-  public OperatorSource stream() {
-    canCreateSource();
-    return new OperatorTopology(this.unboundOutputs.keySet().iterator().next(), this.router);
-  }
-
-  /**
-   * Public method to create a sink object that can take input stream from other operators
-   *
-   * @return The {@link org.apache.samza.sql.api.operators.OperatorSink} that can be the downstream of other operators
-   */
-  public OperatorSink sink() {
-    canCreateSink();
-    return new OperatorTopology(this.unboundInputs.keySet().iterator().next(), this.router);
-  }
-
-  /**
-   * Public method to bind the input of the current operator w/ the {@link org.apache.samza.sql.api.operators.OperatorSource} object
-   *
-   * @param srcStream The {@link org.apache.samza.sql.api.operators.OperatorSource} that the current operator is going to be bound to
-   * @return The updated {@code TopologyBuilder} object
-   */
-  public TopologyBuilder bind(OperatorSource srcStream) {
-    EntityName streamName = srcStream.getName();
-    if (this.unboundInputs.containsKey(streamName)) {
-      this.unboundInputs.remove(streamName);
-      this.interStreams.add(streamName);
-    } else {
-      // no input operator is waiting for the output from the srcStream
-      throw new IllegalArgumentException("No operator input can be bound to the input stream " + streamName);
-    }
-    // add all operators in srcStream to this topology
-    for (Iterator<SimpleOperator> iter = srcStream.opIterator(); iter.hasNext();) {
-      this.addOperator(iter.next());
-    }
-    return this;
-  }
-
-  /**
-   * Public method to attach a {@link org.apache.samza.sql.api.operators.OperatorSink} object to the output of the current operator
-   *
-   * @param nextSink The {@link org.apache.samza.sql.api.operators.OperatorSink} to be attached to the current operator's output
-   * @return The updated {@code TopologyBuilder} object
-   */
-  public TopologyBuilder attach(OperatorSink nextSink) {
-    EntityName streamName = nextSink.getName();
-    if (this.unboundOutputs.containsKey(streamName)) {
-      this.unboundOutputs.remove(streamName);
-      this.interStreams.add(streamName);
-    } else {
-      // no unbound output to attach to
-      throw new IllegalArgumentException("No operator output found to attach the sink " + streamName);
-    }
-    // add all operators in nextSink to the router
-    for (Iterator<SimpleOperator> iter = nextSink.opIterator(); iter.hasNext();) {
-      this.addOperator(iter.next());
-    }
-    return this;
-  }
-
-  /**
-   * Public method to finalize the topology that should have all input and output bound to system input and output
-   *
-   * @return The finalized {@link org.apache.samza.sql.api.operators.OperatorRouter} object
-   */
-  public OperatorRouter build() {
-    canClose();
-    return router;
-  }
-
-  private TopologyBuilder addOperator(SimpleOperator nextOp) {
-    // if input is not in the unboundOutputs and interStreams, input is unbound
-    for (EntityName in : nextOp.getSpec().getInputNames()) {
-      if (this.unboundOutputs.containsKey(in)) {
-        this.unboundOutputs.remove(in);
-        this.interStreams.add(in);
-      }
-      if (!this.interStreams.contains(in) && !in.isSystemEntity()) {
-        if (!this.unboundInputs.containsKey(in)) {
-          this.unboundInputs.put(in, new HashSet<OperatorSpec>());
-        }
-        this.unboundInputs.get(in).add(nextOp.getSpec());
-      }
-    }
-    // if output is not in the unboundInputs and interStreams, output is unbound
-    for (EntityName out : nextOp.getSpec().getOutputNames()) {
-      if (this.unboundInputs.containsKey(out)) {
-        this.unboundInputs.remove(out);
-        this.interStreams.add(out);
-      }
-      if (!this.interStreams.contains(out) && !out.isSystemEntity()) {
-        this.unboundOutputs.put(out, nextOp.getSpec());
-      }
-    }
-    try {
-      this.router.addOperator(nextOp);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to add operator " + nextOp.getSpec().getId() + " to the topology.", e);
-    }
-    return this;
-  }
-
-  private void canCreateSource() {
-    if (this.unboundInputs.size() > 0) {
-      throw new IllegalStateException("Can't create stream when there are unbounded input streams in the topology");
-    }
-    if (this.unboundOutputs.size() != 1) {
-      throw new IllegalStateException(
-          "Can't create stream when the number of unbounded outputs is not 1 in the topology");
-    }
-  }
-
-  private void canCreateSink() {
-    if (this.unboundOutputs.size() > 0) {
-      throw new IllegalStateException("Can't create sink when there are unbounded output streams in the topology");
-    }
-    if (this.unboundInputs.size() != 1) {
-      throw new IllegalStateException(
-          "Can't create sink when the number of unbounded input streams is not 1 in the topology");
-    }
-  }
-
-  private void canAddOperator(SimpleOperator op) {
-    if (this.currentOp == null) {
-      return;
-    }
-    for (EntityName name : this.currentOp.getSpec().getInputNames()) {
-      if (this.unboundInputs.containsKey(name)) {
-        throw new IllegalArgumentException("There are unbound input " + name + " to the current operator "
-            + this.currentOp.getSpec().getId() + ". Create a sink or call bind instead");
-      }
-    }
-    List<EntityName> nextInputs = op.getSpec().getInputNames();
-    for (EntityName name : this.currentOp.getSpec().getOutputNames()) {
-      if (!nextInputs.contains(name) && this.unboundOutputs.containsKey(name)) {
-        // the current operator's output is not in the next operator's input list
-        throw new IllegalArgumentException("There are unbound output " + name + " from the current operator "
-            + this.currentOp.getSpec().getId()
-            + " that are not included in the next operator's inputs. Create a stream or call attach instead");
-      }
-    }
-  }
-
-  private void canClose() {
-    if (!this.unboundInputs.isEmpty() || !this.unboundOutputs.isEmpty()) {
-      throw new IllegalStateException(
-          "There are input/output streams in the topology that are not bounded. Can't build the topology yet.");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
index 7f5b990..2854aeb 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
@@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Stream;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.SimpleOperatorImpl;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
 import org.apache.samza.sql.operators.window.BoundedTimeWindow;
 import org.apache.samza.sql.window.storage.OrderedStoreKey;
 import org.apache.samza.storage.kv.Entry;
@@ -38,6 +38,7 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.sql.SimpleMessageCollector;
 
+
 /**
  * This class implements a simple stream-to-stream join
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
index eecff7e..cc0aca0 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
@@ -19,11 +19,10 @@
 
 package org.apache.samza.sql.operators.join;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.operators.SimpleOperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
 
 
 /**
@@ -36,16 +35,4 @@ public class StreamStreamJoinSpec extends SimpleOperatorSpec {
     // TODO Auto-generated constructor stub
   }
 
-  @SuppressWarnings("serial")
-  public StreamStreamJoinSpec(String id, List<String> inputRelations, String output, List<String> joinKeys) {
-    super(id, new ArrayList<EntityName>() {
-      {
-        for (String input : inputRelations) {
-          add(EntityName.getStreamName(input));
-        }
-      }
-    }, EntityName.getStreamName(output));
-    // TODO Auto-generated constructor stub
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 0cba39a..b93d789 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.SimpleOperatorImpl;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -32,6 +32,7 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.sql.SimpleMessageCollector;
 
+
 /**
  * This is an example build-in operator that performs a simple stream re-partition operation.
  *

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
index e494bff..c47eed9 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition;
 
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.operators.SimpleOperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
 import org.apache.samza.system.SystemStream;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
index a9a83b5..d81cc93 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
@@ -27,12 +27,13 @@ import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.SimpleOperatorImpl;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.sql.SimpleMessageCollector;
 
+
 /**
  * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation
  *
@@ -85,7 +86,6 @@ public class BoundedTimeWindow extends SimpleOperatorImpl {
    * @param lengthSec The window size in seconds
    * @param input The input stream name
    * @param output The output relation name
-   * @param callback The user callback object
    */
   public BoundedTimeWindow(String wndId, int lengthSec, String input, String output, OperatorCallback callback) {
     super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getStreamName(output), lengthSec), callback);

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
index 6c4eba8..eec32ea 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window;
 
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.operators.SimpleOperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
 
 
 /**
@@ -47,11 +47,6 @@ public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
     this.wndSizeSec = lengthSec;
   }
 
-  public WindowSpec(String id, int wndSize, String input) {
-    super(id, EntityName.getStreamName(input), null);
-    this.wndSizeSec = wndSize;
-  }
-
   /**
    * Method to get the window state relation name
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
index 6950f67..b29838a 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
@@ -22,7 +22,7 @@ package org.apache.samza.task.sql;
 import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.NoopOperatorCallback;
+import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -57,38 +57,25 @@ public class SimpleMessageCollector implements MessageCollector {
    * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
    */
   public SimpleMessageCollector(MessageCollector collector, TaskCoordinator coordinator) {
-    this(collector, coordinator, new NoopOperatorCallback());
+    this.collector = collector;
+    this.coordinator = coordinator;
   }
 
   /**
    * This method swaps the {@code callback} with the new one
    *
-   * <p> This method allows the {@link org.apache.samza.sql.api.operators.OperatorCallback} to be swapped when the collector
-   * is passed down into the next operator's context. Hence, under the new operator's context, the correct callback functions can be invoked
+   * <p> This method allows the {@link org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the collector
+   * is passed down into the next operator's context. Hence, under the new operator's context, the correct {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, MessageCollector, TaskCoordinator)},
+   * and {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, MessageCollector, TaskCoordinator)} can be invoked
    *
    * @param callback The new {@link org.apache.samza.sql.api.operators.OperatorCallback} to be set
    */
-  public void switchCallback(OperatorCallback callback) {
-    if (callback == null) {
-      this.callback = new NoopOperatorCallback();
-    } else {
-      this.callback = callback;
-    }
-  }
-
-  /**
-   * Method is declared to be final s.t. we enforce that the callback functions are called first
-   */
-  @Override
-  final public void send(OutgoingMessageEnvelope envelope) {
-    this.collector.send(envelope);
+  public void switchOperatorCallback(OperatorCallback callback) {
+    this.callback = callback;
   }
 
   /**
    * Method is declared to be final s.t. we enforce that the callback functions are called first
-   *
-   * @param deltaRelation The relation to be sent out
-   * @throws Exception Throws exception if failed to send
    */
   final public void send(Relation deltaRelation) throws Exception {
     Relation rel = this.callback.afterProcess(deltaRelation, collector, coordinator);
@@ -100,9 +87,6 @@ public class SimpleMessageCollector implements MessageCollector {
 
   /**
    * Method is declared to be final s.t. we enforce that the callback functions are called first
-   *
-   * @param tuple The tuple to be sent out
-   * @throws Exception Throws exception if failed to send
    */
   final public void send(Tuple tuple) throws Exception {
     Tuple otuple = this.callback.afterProcess(tuple, collector, coordinator);
@@ -122,4 +106,9 @@ public class SimpleMessageCollector implements MessageCollector {
   protected void realSend(Tuple tuple) throws Exception {
     this.collector.send((OutgoingMessageEnvelope) tuple.getMessage());
   }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    this.collector.send(envelope);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
index 7370af6..20dc701 100644
--- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
+++ b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
@@ -22,7 +22,6 @@ package org.apache.samza.task.sql;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.api.data.Relation;
 import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.Operator;
 import org.apache.samza.sql.api.operators.OperatorCallback;
 import org.apache.samza.sql.data.IncomingMessageTuple;
 import org.apache.samza.sql.operators.window.BoundedTimeWindow;
@@ -40,7 +39,7 @@ import org.apache.samza.task.WindowableTask;
  *
  */
 public class RandomWindowOperatorTask implements StreamTask, InitableTask, WindowableTask {
-  private Operator operator;
+  private BoundedTimeWindow wndOp;
 
   private final OperatorCallback wndCallback = new OperatorCallback() {
 
@@ -78,20 +77,20 @@ public class RandomWindowOperatorTask implements StreamTask, InitableTask, Windo
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
       throws Exception {
     // based on tuple's stream name, get the window op and run process()
-    operator.process(new IncomingMessageTuple(envelope), collector, coordinator);
+    wndOp.process(new IncomingMessageTuple(envelope), collector, coordinator);
 
   }
 
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
     // based on tuple's stream name, get the window op and run process()
-    operator.refresh(System.nanoTime(), collector, coordinator);
+    wndOp.refresh(System.nanoTime(), collector, coordinator);
   }
 
   @Override
   public void init(Config config, TaskContext context) throws Exception {
     // 1. create a fixed length 10 sec window operator
-    this.operator = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "wndOutput", this.wndCallback);
-    this.operator.init(config, context);
+    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1", this.wndCallback);
+    this.wndOp.init(config, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebecbf2d/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
index d65892c..9124e3c 100644
--- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.SimpleRouter;
+import org.apache.samza.sql.operators.factory.SimpleRouter;
 import org.apache.samza.sql.operators.join.StreamStreamJoin;
 import org.apache.samza.sql.operators.partition.PartitionOp;
 import org.apache.samza.sql.operators.window.BoundedTimeWindow;
@@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask;
  */
 public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
 
-  private SimpleRouter router;
+  private SimpleRouter rteCntx;
 
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
       throws Exception {
-    this.router.process(new IncomingMessageTuple(envelope), collector, coordinator);
+    this.rteCntx.process(new IncomingMessageTuple(envelope), collector, coordinator);
   }
 
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    this.router.refresh(System.nanoTime(), collector, coordinator);
+    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
   }
 
   @Override
   public void init(Config config, TaskContext context) throws Exception {
     // create all operators via the operator factory
     // 1. create two window operators
-    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, "kafka:inputStream1", "fixedWndOutput1");
-    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, "kafka:inputStream2", "fixedWndOutput2");
+    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", "fixedWndOutput1");
+    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", "fixedWndOutput2");
     // 2. create one join operator
     @SuppressWarnings("serial")
     List<String> inputRelations = new ArrayList<String>() {
@@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
       }
     };
     StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, "joinOutput", joinKeys);
-    // 3. create a re-partition operator
+    // 4. create a re-partition operator
     PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", "parOutputStrm1", "joinKey", 50);
 
     // Now, connecting the operators via the OperatorRouter
-    this.router = new SimpleRouter();
+    this.rteCntx = new SimpleRouter();
     // 1. set two system input operators (i.e. two window operators)
-    this.router.addOperator(wnd1);
-    this.router.addOperator(wnd2);
+    this.rteCntx.addOperator(wnd1);
+    this.rteCntx.addOperator(wnd2);
     // 2. connect join operator to both window operators
-    this.router.addOperator(join);
+    this.rteCntx.addOperator(join);
     // 3. connect re-partition operator to the stream operator
-    this.router.addOperator(par);
+    this.rteCntx.addOperator(par);
 
-    this.router.init(config, context);
+    this.rteCntx.init(config, context);
   }
 }