You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/10/04 23:23:02 UTC

[2/5] samza git commit: SAMZA-914: Creating the fluent programming APIs w/ operators

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java
deleted file mode 100644
index 1e8f192..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.Map;
-
-
-public interface Schema {
-
-  enum Type {
-    INTEGER,
-    LONG,
-    FLOAT,
-    DOUBLE,
-    BOOLEAN,
-    STRING,
-    BYTES,
-    STRUCT,
-    ARRAY,
-    MAP
-  };
-
-  Type getType();
-
-  Schema getElementType();
-
-  Schema getValueType();
-
-  Map<String, Schema> getFields();
-
-  Schema getFieldType(String fldName);
-
-  Data read(Object object);
-
-  Data transform(Data inputData);
-
-  boolean equals(Schema other);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
deleted file mode 100644
index 931705e..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.List;
-
-
-/**
- * This interface defines an ordered {@link org.apache.samza.sql.api.data.Relation}, which has an ordered key.
- *
- * <p> This is to define a stream created by CREATE STREAM statement
- *
- * @param <K> The ordered key for the {@code Stream} class
- */
-public interface Stream<K extends Comparable<?>> extends Relation<K> {
-  /**
-   * Get the list of field names used as the order keys for this stream
-   *
-   * @return The list of field names used to construct the order key for the stream
-   */
-  List<String> getOrderFields();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
deleted file mode 100644
index 7b4d984..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-/**
- * This interface defines a non-ordered {@link org.apache.samza.sql.api.data.Relation}, which has a unique primary key
- *
- * <p> This is to define a table created by CREATE TABLE statement
- *
- * @param <K> The primary key for the {@code Table} class
- */
-public interface Table<K> extends Relation<K> {
-
-  /**
-   * Get the primary key field name for this table
-   *
-   * @return The name of the primary key field
-   */
-  String getPrimaryKeyName();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
deleted file mode 100644
index bea922b..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import org.apache.samza.system.sql.Offset;
-
-
-/**
- * This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>.
- *
- * <p>The <code>Tuple</code> models the basic operatible unit in streaming SQL processes in Samza.
- *
- */
-public interface Tuple {
-
-  /**
-   * Access method to get the corresponding message body in the tuple
-   *
-   * @return Message object in the tuple
-   */
-  Data getMessage();
-
-  /**
-   * Method to indicate whether the tuple is a delete tuple or an insert tuple
-   *
-   * @return A boolean value indicates whether the current tuple is a delete or insert message
-   */
-  boolean isDelete();
-
-  /**
-   * Access method to the key of the tuple
-   *
-   * @return The <code>key</code> of the tuple
-   */
-  Data getKey();
-
-  /**
-   * Get the stream name of the tuple. Note this stream name should be unique in the system.
-   *
-   * @return The stream name which this tuple belongs to
-   */
-  EntityName getEntityName();
-
-  /**
-   * Get the message creation timestamp of the tuple.
-   *
-   * @return The tuple's creation timestamp in nano seconds.
-   */
-  long getCreateTimeNano();
-
-  /**
-   * Get the offset of the tuple in the stream. This should be used to uniquely identify a tuple in a stream.
-   *
-   * @return The offset of the tuple in the stream.
-   */
-  Offset getOffset();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
deleted file mode 100644
index d6f6b57..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-public interface Operator {
-  /**
-   * Method to initialize the operator
-   *
-   * @param config The configuration object
-   * @param context The task context
-   * @throws Exception Throws Exception if failed to initialize the operator
-   */
-  void init(Config config, TaskContext context) throws Exception;
-
-  /**
-   * Method to perform a relational logic on the input relation
-   *
-   * <p> The actual implementation of relational logic is performed by the implementation of this method.
-   *
-   * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
-   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed
-   */
-  void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-  /**
-   * Method to process on an input tuple.
-   *
-   * @param tuple The input tuple, which has the incoming message from a stream
-   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed
-   */
-  void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
-
-  /**
-   * Method to refresh the result when a timer expires
-   *
-   * @param timeNano The current system time in nano second
-   * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed
-   */
-  void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
deleted file mode 100644
index fb2aa89..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Defines the callback functions to allow customized functions to be invoked before process and before sending the result
- */
-public interface OperatorCallback {
-  /**
-   * Method to be invoked before the operator actually process the input tuple
-   *
-   * @param tuple The incoming tuple
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
-   * @return The tuple to be processed; return {@code null} if there is nothing to be processed
-   */
-  Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
-
-  /**
-   * Method to be invoked before the operator actually process the input relation
-   *
-   * @param rel The input relation
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
-   * @return The relation to be processed; return {@code null} if there is nothing to be processed
-   */
-  Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
-
-  /**
-   * Method to be invoked before the operator's output tuple is sent
-   *
-   * @param tuple The output tuple
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
-   * @return The tuple to be sent; return {@code null} if there is nothing to be sent
-   */
-  Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
-
-  /**
-   * Method to be invoked before the operator's output relation is sent
-   *
-   * @param rel The output relation
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
-   * @return The relation to be sent; return {@code null} if there is nothing to be sent
-   */
-  Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
deleted file mode 100644
index 0759638..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-/**
- * This interface class defines interface methods to connect {@link org.apache.samza.sql.api.operators.SimpleOperator}s together into a composite operator.
- *
- * <p>The {@code OperatorRouter} allows the user to attach operators to a {@link org.apache.samza.sql.api.data.Table} or
- * a {@link org.apache.samza.sql.api.data.Stream} entity, if the corresponding table/stream is included as inputs to the operator.
- * Each operator then executes its own logic and determines which table/stream to emit the output to. Through the {@code OperatorRouter},
- * the next operators attached to the corresponding output entities (i.e. table/streams) can then be invoked to continue the
- * stream process task.
- */
-public interface OperatorRouter extends Operator {
-
-  /**
-   * This method adds a {@link org.apache.samza.sql.api.operators.SimpleOperator} to the {@code OperatorRouter}.
-   *
-   * @param nextOp The {@link org.apache.samza.sql.api.operators.SimpleOperator} to be added
-   * @throws Exception Throws exception if failed
-   */
-  void addOperator(SimpleOperator nextOp) throws Exception;
-
-  /**
-   * This method gets the list of {@link org.apache.samza.sql.api.operators.SimpleOperator}s attached to an output entity (of any type)
-   *
-   * @param output The identifier of the output entity
-   * @return The list of {@link org.apache.samza.sql.api.operators.SimpleOperator} taking {@code output} as input table/stream
-   */
-  List<SimpleOperator> getNextOperators(EntityName output);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
deleted file mode 100644
index 4d670fd..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-/**
- * This class defines a generic specification interface class for all {@link org.apache.samza.sql.api.operators.SimpleOperator}s.
- *
- * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
- *
- * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
- * in {@link org.apache.samza.sql.api.data.Table} or {@link org.apache.samza.sql.api.data.Stream} name) of input variables , and the list of entity names of the output variables.
- *
- */
-public interface OperatorSpec {
-  /**
-   * Interface method that returns the unique ID of the operator in a task
-   *
-   * @return The unique ID of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   */
-  String getId();
-
-  /**
-   * Access method to the list of entity names of input variables.
-   *
-   * @return A list of entity names of the inputs
-   */
-  List<EntityName> getInputNames();
-
-  /**
-   * Access method to the list of entity name of the output variable
-   *
-   * @return The entity name of the output
-   *
-   */
-  List<EntityName> getOutputNames();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
deleted file mode 100644
index c49a822..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-
-
-/**
- * The interface for a {@code SimpleOperator} that implements a simple primitive relational logic operation
- */
-public interface SimpleOperator extends Operator {
-  /**
-   * Method to get the specification of this {@code SimpleOperator}
-   *
-   * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} object that defines the configuration/parameters of the operator
-   */
-  OperatorSpec getSpec();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
deleted file mode 100644
index 6f8d93b..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-
-
-/**
- * This class defines the interface of SQL operator factory, which creates the {@link org.apache.samza.sql.api.operators.SimpleOperator}s:
- */
-public interface SqlOperatorFactory {
-
-  /**
-   * Interface method to create/get the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   *
-   * @param spec The specification of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   * @return The {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   */
-  SimpleOperator getOperator(OperatorSpec spec);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
deleted file mode 100644
index 72a59f2..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.data;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.sql.LongOffset;
-import org.apache.samza.system.sql.Offset;
-
-
-/**
- * This class implements a {@link org.apache.samza.sql.api.data.Tuple} that encapsulates an {@link org.apache.samza.system.IncomingMessageEnvelope} from the system
- *
- */
-public class IncomingMessageTuple implements Tuple {
-  /**
-   * Incoming message envelope
-   */
-  private final IncomingMessageEnvelope imsg;
-
-  /**
-   * The entity name for the incoming system stream
-   */
-  private final EntityName strmEntity;
-
-  /**
-   * The receive time of this incoming message
-   */
-  private final long recvTimeNano;
-
-  /**
-   * Ctor to create a {@code IncomingMessageTuple} from {@link org.apache.samza.system.IncomingMessageEnvelope}
-   *
-   * @param imsg The incoming system message
-   */
-  public IncomingMessageTuple(IncomingMessageEnvelope imsg) {
-    this.imsg = imsg;
-    this.strmEntity =
-        EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg
-            .getSystemStreamPartition().getStream()));
-    this.recvTimeNano = System.nanoTime();
-  }
-
-  @Override
-  public Data getMessage() {
-    return (Data) this.imsg.getMessage();
-  }
-
-  @Override
-  public boolean isDelete() {
-    return false;
-  }
-
-  @Override
-  public Data getKey() {
-    return (Data) this.imsg.getKey();
-  }
-
-  @Override
-  public EntityName getEntityName() {
-    return this.strmEntity;
-  }
-
-  @Override
-  public long getCreateTimeNano() {
-    // TODO: this is wrong and just to keep as an placeholder. It should be replaced by the message publish time when the publish timestamp is available in the message metadata
-    return this.recvTimeNano;
-  }
-
-  @Override
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
-    // assuming incoming message carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.imsg.getOffset());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
deleted file mode 100644
index d040be9..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroData implements Data {
-  protected final Object datum;
-  protected final AvroSchema schema;
-
-  private AvroData(AvroSchema schema, Object datum) {
-    this.datum = datum;
-    this.schema = schema;
-  }
-
-  @Override
-  public Schema schema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object value() {
-    return this.datum;
-  }
-
-  @Override
-  public int intValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public long longValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public float floatValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public double doubleValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public boolean booleanValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public String strValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public byte[] bytesValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public List<Object> arrayValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Map<Object, Object> mapValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getElement(int index) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getFieldData(String fldName) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  public static AvroData getArray(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.ARRAY) {
-      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
-
-      @Override
-      public List<Object> arrayValue() {
-        return this.array;
-      }
-
-      @Override
-      public Data getElement(int index) {
-        return this.schema.getElementType().read(array.get(index));
-      }
-
-    };
-  }
-
-  public static AvroData getMap(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.MAP) {
-      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final Map<Object, Object> map = (Map<Object, Object>) datum;
-
-      @Override
-      public Map<Object, Object> mapValue() {
-        return this.map;
-      }
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getValueType().read(map.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getStruct(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      private final GenericRecord record = (GenericRecord) datum;
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getFieldType(fldName).read(record.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getInt(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public int intValue() {
-        return ((Integer) datum).intValue();
-      }
-
-    };
-  }
-
-  public static AvroData getLong(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public long longValue() {
-        return ((Long) datum).longValue();
-      }
-
-    };
-  }
-
-  public static AvroData getFloat(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public float floatValue() {
-        return ((Float) datum).floatValue();
-      }
-
-    };
-  }
-
-  public static AvroData getDouble(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public double doubleValue() {
-        return ((Double) datum).doubleValue();
-      }
-
-    };
-  }
-
-  public static AvroData getBoolean(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public boolean booleanValue() {
-        return ((Boolean) datum).booleanValue();
-      }
-
-    };
-  }
-
-  public static AvroData getString(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public String strValue() {
-        return ((CharSequence) datum).toString();
-      }
-
-    };
-  }
-
-  public static AvroData getBytes(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public byte[] bytesValue() {
-        return ((ByteBuffer) datum).array();
-      }
-
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
deleted file mode 100644
index 577cf74..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.Schema.Field;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroSchema implements Schema {
-
-  protected final org.apache.avro.Schema avroSchema;
-  protected final Schema.Type type;
-
-  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
-      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
-
-  static {
-    primSchemas.put(org.apache.avro.Schema.Type.INT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getInt(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.LONG,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getLong(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getFloat(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getDouble(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBoolean(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.STRING,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getString(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBytes(this, datum);
-      }
-    });
-  };
-
-  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
-    Schema.Type type = mapType(schema.getType());
-    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
-      return primSchemas.get(schema.getType());
-    }
-    // otherwise, construct the new schema
-    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
-    switch (type) {
-      case ARRAY:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.ARRAY) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getElementType().equals(this.getElementType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getElementType().getType());
-            }
-            // input type matches array type
-            return AvroData.getArray(this, input.value());
-          }
-        };
-      case MAP:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.MAP) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getValueType().equals(this.getValueType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getValueType().getType());
-            }
-            // input type matches map type
-            return AvroData.getMap(this, input.value());
-          }
-        };
-      case STRUCT:
-        return new AvroSchema(schema) {
-          @SuppressWarnings("serial")
-          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
-            {
-              for (Field field : schema.getFields()) {
-                put(field.name(), getSchema(field.schema()));
-              }
-            }
-          };
-
-          @Override
-          public Map<String, Schema> getFields() {
-            return this.fldSchemas;
-          }
-
-          @Override
-          public Schema getFieldType(String fldName) {
-            return this.fldSchemas.get(fldName);
-          }
-
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.STRUCT) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            // Note: this particular transform function only implements "projection to a sub-set" concept.
-            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
-            for (String fldName : this.fldSchemas.keySet()) {
-              // check each field schema matches input
-              Schema fldSchema = this.fldSchemas.get(fldName);
-              Schema inputFld = input.schema().getFieldType(fldName);
-              if (!fldSchema.equals(inputFld)) {
-                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
-                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
-              }
-            }
-            // input type matches struct type
-            return AvroData.getStruct(this, input.value());
-          }
-
-        };
-      default:
-        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
-    }
-  }
-
-  private AvroSchema(org.apache.avro.Schema schema) {
-    this.avroSchema = schema;
-    this.type = mapType(schema.getType());
-  }
-
-  private static Type mapType(org.apache.avro.Schema.Type type) {
-    switch (type) {
-      case ARRAY:
-        return Schema.Type.ARRAY;
-      case RECORD:
-        return Schema.Type.STRUCT;
-      case MAP:
-        return Schema.Type.MAP;
-      case INT:
-        return Schema.Type.INTEGER;
-      case LONG:
-        return Schema.Type.LONG;
-      case BOOLEAN:
-        return Schema.Type.BOOLEAN;
-      case FLOAT:
-        return Schema.Type.FLOAT;
-      case DOUBLE:
-        return Schema.Type.DOUBLE;
-      case STRING:
-        return Schema.Type.STRING;
-      case BYTES:
-        return Schema.Type.BYTES;
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
-    }
-  }
-
-  @Override
-  public Type getType() {
-    return this.type;
-  }
-
-  @Override
-  public Schema getElementType() {
-    if (this.type != Schema.Type.ARRAY) {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getElementType());
-  }
-
-  @Override
-  public Schema getValueType() {
-    if (this.type != Schema.Type.MAP) {
-      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getValueType());
-  }
-
-  @Override
-  public Map<String, Schema> getFields() {
-    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-  }
-
-  @Override
-  public Schema getFieldType(String fldName) {
-    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-  }
-
-  @Override
-  public Data read(Object object) {
-    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
-      return AvroData.getArray(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
-      return AvroData.getMap(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
-      return AvroData.getStruct(this, object);
-    }
-    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
-  }
-
-  @Override
-  public Data transform(Data inputData) {
-    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
-        || inputData.schema().getType() == Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
-    }
-    if (inputData.schema().getType() != this.type) {
-      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-          + ", input type:" + inputData.schema().getType());
-    }
-    return inputData;
-  }
-
-  @Override
-  public boolean equals(Schema other) {
-    // TODO Auto-generated method stub
-    if (this.type != other.getType()) {
-      return false;
-    }
-    switch (this.type) {
-      case ARRAY:
-        // check if element types are the same
-        return this.getElementType().equals(other.getElementType());
-      case MAP:
-        // check if value types are the same
-        return this.getValueType().equals(other.getValueType());
-      case STRUCT:
-        // check if the fields schemas in this equals the other
-        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
-        for (String fieldName : this.getFields().keySet()) {
-          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
-            return false;
-          }
-        }
-        return true;
-      default:
-        return true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
deleted file mode 100644
index f3f7f7d..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.samza.SamzaException;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.avro.AvroData;
-import org.apache.samza.sql.data.avro.AvroSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class SqlAvroSerde implements Serde<AvroData> {
-  private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class);
-
-  private final Schema avroSchema;
-  private final GenericDatumReader<GenericRecord> reader;
-  private final GenericDatumWriter<Object> writer;
-
-  public SqlAvroSerde(Schema avroSchema) {
-    this.avroSchema = avroSchema;
-    this.reader = new GenericDatumReader<GenericRecord>(avroSchema);
-    this.writer = new GenericDatumWriter<Object>(avroSchema);
-  }
-
-  @Override
-  public AvroData fromBytes(byte[] bytes) {
-    GenericRecord data;
-
-    try {
-      data = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
-      return getAvroData(data, avroSchema);
-    } catch (IOException e) {
-      String errMsg = "Cannot decode message.";
-      log.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  @Override
-  public byte[] toBytes(AvroData object) {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-
-    try {
-      writer.write(object.value(), encoder);
-      encoder.flush();
-      return out.toByteArray();
-    } catch (IOException e) {
-      String errMsg = "Cannot perform Avro binary encode.";
-      log.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  private AvroData getAvroData(GenericRecord data, Schema type){
-    AvroSchema schema = AvroSchema.getSchema(type);
-    switch (type.getType()){
-      case RECORD:
-        return AvroData.getStruct(schema, data);
-      case ARRAY:
-        return AvroData.getArray(schema, data);
-      case MAP:
-        return AvroData.getMap(schema, data);
-      case INT:
-        return AvroData.getInt(schema, data);
-      case LONG:
-        return AvroData.getLong(schema, data);
-      case BOOLEAN:
-        return AvroData.getBoolean(schema, data);
-      case FLOAT:
-        return AvroData.getFloat(schema, data);
-      case DOUBLE:
-        return AvroData.getDouble(schema, data);
-      case STRING:
-        return AvroData.getString(schema, data);
-      case BYTES:
-        return AvroData.getBytes(schema, data);
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
deleted file mode 100644
index aad18f4..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.avro.AvroData;
-
-public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
-  public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
-
-  @Override
-  public Serde<AvroData> getSerde(String name, Config config) {
-    String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name));
-    if (avroSchemaStr == null || avroSchemaStr.isEmpty()) {
-      throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'.");
-    }
-
-    return new SqlAvroSerde(new Schema.Parser().parse(avroSchemaStr));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
deleted file mode 100644
index 1f0c3b2..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.sql.data.string.StringData;
-
-import java.io.UnsupportedEncodingException;
-
-public class SqlStringSerde implements Serde<StringData> {
-
-    private final Serde<String> serde;
-
-    public SqlStringSerde(String encoding) {
-        this.serde = new StringSerde(encoding);
-    }
-
-    @Override
-    public StringData fromBytes(byte[] bytes) {
-          return new StringData(serde.fromBytes(bytes));
-    }
-
-    @Override
-    public byte[] toBytes(StringData object) {
-        return serde.toBytes(object.strValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
deleted file mode 100644
index 2564479..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.string.StringData;
-
-public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
-    @Override
-    public Serde<StringData> getSerde(String name, Config config) {
-        return new SqlStringSerde(config.get("encoding", "UTF-8"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java
deleted file mode 100644
index b81d9fa..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.List;
-import java.util.Map;
-
-public class StringData implements Data {
-    private final Object datum;
-    private final Schema schema;
-
-    public StringData(Object datum) {
-        this.datum = datum;
-        this.schema = new StringSchema();
-    }
-
-    @Override
-    public Schema schema() {
-        return this.schema;
-    }
-
-    @Override
-    public Object value() {
-        return this.datum;
-    }
-
-    @Override
-    public int intValue() {
-        throw new UnsupportedOperationException("Can't get int value for a string type data");
-    }
-
-    @Override
-    public long longValue() {
-        throw new UnsupportedOperationException("Can't get long value for a string type data");
-    }
-
-    @Override
-    public float floatValue() {
-        throw new UnsupportedOperationException("Can't get float value for a string type data");
-    }
-
-    @Override
-    public double doubleValue() {
-        throw new UnsupportedOperationException("Can't get double value for a string type data");
-    }
-
-    @Override
-    public boolean booleanValue() {
-        throw new UnsupportedOperationException("Can't get boolean value for a string type data");
-    }
-
-    @Override
-    public String strValue() {
-        return String.valueOf(datum);
-    }
-
-    @Override
-    public byte[] bytesValue() {
-        throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
-    }
-
-    @Override
-    public List<Object> arrayValue() {
-        throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
-    }
-
-    @Override
-    public Map<Object, Object> mapValue() {
-        throw new UnsupportedOperationException("Can't get mapValue for a string type data");
-    }
-
-    @Override
-    public Data getElement(int index) {
-        throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
-    }
-
-    @Override
-    public Data getFieldData(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
deleted file mode 100644
index 348fc0c..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.Map;
-
-public class StringSchema implements Schema {
-    private Type type = Type.STRING;
-
-    @Override
-    public Type getType() {
-      return Type.STRING;
-    }
-
-    @Override
-    public Schema getElementType() {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-
-    @Override
-    public Schema getValueType() {
-        throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-
-    @Override
-    public Map<String, Schema> getFields() {
-        throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-    }
-
-    @Override
-    public Schema getFieldType(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-    }
-
-    @Override
-    public Data read(Object object) {
-        return new StringData(object);
-    }
-
-    @Override
-    public Data transform(Data inputData) {
-        if (inputData.schema().getType() != this.type) {
-            throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-                    + ", input type:" + inputData.schema().getType());
-        }
-        return inputData;
-    }
-
-    @Override
-    public boolean equals(Schema other) {
-        return other.getType() == this.type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
deleted file mode 100644
index c3d2266..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-public final class NoopOperatorCallback implements OperatorCallback {
-
-  @Override
-  public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
-    return tuple;
-  }
-
-  @Override
-  public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
-    return rel;
-  }
-
-  @Override
-  public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
-    return tuple;
-  }
-
-  @Override
-  public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
-    return rel;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
deleted file mode 100644
index cbc84d0..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.sql.api.operators.SqlOperatorFactory;
-import org.apache.samza.sql.operators.join.StreamStreamJoin;
-import org.apache.samza.sql.operators.join.StreamStreamJoinSpec;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-
-
-/**
- * This simple factory class provides method to create the build-in operators per operator specification.
- * It can be extended when the build-in operators expand.
- *
- */
-public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
-
-  @Override
-  public SimpleOperator getOperator(OperatorSpec spec) {
-    if (spec instanceof PartitionSpec) {
-      return new PartitionOp((PartitionSpec) spec);
-    } else if (spec instanceof StreamStreamJoinSpec) {
-      return new StreamStreamJoin((StreamStreamJoinSpec) spec);
-    } else if (spec instanceof WindowSpec) {
-      return new BoundedTimeWindow((WindowSpec) spec);
-    }
-    throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
deleted file mode 100644
index e66451f..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SimpleMessageCollector;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
- * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator}
- *
- */
-public abstract class SimpleOperatorImpl implements SimpleOperator {
-  /**
-   * The specification of this operator
-   */
-  private final OperatorSpec spec;
-
-  /**
-   * The callback function
-   */
-  private final OperatorCallback callback;
-
-  /**
-   * Ctor of {@code SimpleOperatorImpl} class
-   *
-   * @param spec The specification of this operator
-   */
-  public SimpleOperatorImpl(OperatorSpec spec) {
-    this(spec, new NoopOperatorCallback());
-  }
-
-  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) {
-    this.spec = spec;
-    this.callback = callback;
-  }
-
-  @Override
-  public OperatorSpec getSpec() {
-    return this.spec;
-  }
-
-  /**
-   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)}
-   * and real processing of the input relation is fixed.
-   */
-  @Override
-  final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator);
-    if (rel == null) {
-      return;
-    }
-    this.realProcess(rel, getCollector(collector, coordinator), coordinator);
-  }
-
-  /**
-   * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)}
-   * and real processing of the input tuple is fixed.
-   */
-  @Override
-  final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator);
-    if (ituple == null) {
-      return;
-    }
-    this.realProcess(ituple, getCollector(collector, coordinator), coordinator);
-  }
-
-  /**
-   * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher
-   */
-  @Override
-  final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator);
-  }
-
-  private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) {
-    if (!(collector instanceof SimpleMessageCollector)) {
-      return new SimpleMessageCollector(collector, coordinator, this.callback);
-    } else {
-      ((SimpleMessageCollector) collector).switchOperatorCallback(this.callback);
-      return (SimpleMessageCollector) collector;
-    }
-  }
-
-  /**
-   * Method to be overriden by each specific implementation class of operator to handle timeout event
-   *
-   * @param timeNano The time in nanosecond when the timeout event occurred
-   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception Throws exception if failed to refresh the results
-   */
-  protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-  /**
-   * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation}
-   *
-   * @param rel The input relation
-   * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
-   * @throws Exception
-   */
-  protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-  protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator)
-      throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
deleted file mode 100644
index 56753b6..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
- * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
- */
-public abstract class SimpleOperatorSpec implements OperatorSpec {
-  /**
-   * The identifier of the corresponding operator
-   */
-  private final String id;
-
-  /**
-   * The list of input entity names of the corresponding operator
-   */
-  private final List<EntityName> inputs = new ArrayList<EntityName>();
-
-  /**
-   * The list of output entity names of the corresponding operator
-   */
-  private final List<EntityName> outputs = new ArrayList<EntityName>();
-
-  /**
-   * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output
-   *
-   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   * @param input The only input entity
-   * @param output The only output entity
-   */
-  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
-    this.id = id;
-    this.inputs.add(input);
-    this.outputs.add(output);
-  }
-
-  /**
-   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs
-   *
-   * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
-   * @param inputs The list of input entities
-   * @param output The list of output entities
-   */
-  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) {
-    this.id = id;
-    this.inputs.addAll(inputs);
-    this.outputs.add(output);
-  }
-
-  @Override
-  public String getId() {
-    return this.id;
-  }
-
-  @Override
-  public List<EntityName> getInputNames() {
-    return this.inputs;
-  }
-
-  @Override
-  public List<EntityName> getOutputNames() {
-    return this.outputs;
-  }
-
-  /**
-   * Method to get the first output entity
-   *
-   * @return The first output entity name
-   */
-  public EntityName getOutputName() {
-    return this.outputs.get(0);
-  }
-
-  /**
-   * Method to get the first input entity
-   *
-   * @return The first input entity name
-   */
-  public EntityName getInputName() {
-    return this.inputs.get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
deleted file mode 100644
index e570897..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.OperatorRouter;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.RouterMessageCollector;
-
-
-/**
- * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter}
- *
- */
-public final class SimpleRouter implements OperatorRouter {
-  /**
-   * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter}
-   */
-  private List<SimpleOperator> operators = new ArrayList<SimpleOperator>();
-
-  @SuppressWarnings("rawtypes")
-  /**
-   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it
-   */
-  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
-
-  /**
-   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter}
-   */
-  private Set<EntityName> inputEntities = new HashSet<EntityName>();
-
-  /**
-   * Set of entities that are not input entities to this {@code SimpleRouter}
-   */
-  private Set<EntityName> outputEntities = new HashSet<EntityName>();
-
-  @SuppressWarnings("unchecked")
-  private void addOperator(EntityName input, SimpleOperator nextOp) {
-    if (nextOps.get(input) == null) {
-      nextOps.put(input, new ArrayList<Operator>());
-    }
-    nextOps.get(input).add(nextOp);
-    operators.add(nextOp);
-    // get the operator spec
-    for (EntityName output : nextOp.getSpec().getOutputNames()) {
-      if (inputEntities.contains(output)) {
-        inputEntities.remove(output);
-      }
-      outputEntities.add(output);
-    }
-    if (!outputEntities.contains(input)) {
-      inputEntities.add(input);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public List<SimpleOperator> getNextOperators(EntityName entity) {
-    return nextOps.get(entity);
-  }
-
-  @Override
-  public void addOperator(SimpleOperator nextOp) {
-    List<EntityName> inputs = nextOp.getSpec().getInputNames();
-    for (EntityName input : inputs) {
-      addOperator(input, nextOp);
-    }
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    for (SimpleOperator op : this.operators) {
-      op.init(config, context);
-    }
-  }
-
-  @Override
-  public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
-      iter.next().process(ituple, opCollector, coordinator);
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) {
-      iter.next().process(deltaRelation, opCollector, coordinator);
-    }
-  }
-
-  @Override
-  public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
-    for (EntityName entity : inputEntities) {
-      for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) {
-        iter.next().refresh(nanoSec, opCollector, coordinator);
-      }
-    }
-  }
-
-}