You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/14 23:40:17 UTC

[4/5] samza git commit: SAMZA-648: separate samza-sql-core and samza-sql-calcite: step 1: move samza-sql to samza-sql-core

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
new file mode 100644
index 0000000..916b166
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.SqlOperatorFactory;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.partition.PartitionOp;
+import org.apache.samza.sql.operators.partition.PartitionSpec;
+import org.apache.samza.sql.operators.relation.Join;
+import org.apache.samza.sql.operators.relation.JoinSpec;
+import org.apache.samza.sql.operators.stream.InsertStream;
+import org.apache.samza.sql.operators.stream.InsertStreamSpec;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.sql.operators.window.WindowSpec;
+
+
+/**
+ * This simple factory class provides method to create the build-in operators per operator specification.
+ * It can be extended when the build-in operators expand.
+ *
+ */
+public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
+
+  @Override
+  public RelationOperator getRelationOperator(OperatorSpec spec) {
+    if (spec instanceof JoinSpec) {
+      return new Join((JoinSpec) spec);
+    } else if (spec instanceof InsertStreamSpec) {
+      return new InsertStream((InsertStreamSpec) spec);
+    }
+    throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
+  }
+
+  @Override
+  public TupleOperator getTupleOperator(OperatorSpec spec) {
+    if (spec instanceof WindowSpec) {
+      return new BoundedTimeWindow((WindowSpec) spec);
+    } else if (spec instanceof PartitionSpec) {
+      return new PartitionOp((PartitionSpec) spec);
+    }
+    throw new UnsupportedOperationException("Unsupported operator specified" + spec.getClass().getCanonicalName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
new file mode 100644
index 0000000..93d4ebb
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
+ *
+ */
+public abstract class SimpleOperatorSpec implements OperatorSpec {
+  /**
+   * The identifier of the corresponding operator
+   */
+  private final String id;
+
+  /**
+   * The list of input entity names of the corresponding operator
+   */
+  private final List<EntityName> inputs = new ArrayList<EntityName>();
+
+  /**
+   * The list of output entity names of the corresponding operator
+   */
+  private final List<EntityName> outputs = new ArrayList<EntityName>();
+
+  /**
+   * Ctor of the <code>SimpleOperatorSpec</code> for simple <code>Operator</code>s w/ one input and one output
+   *
+   * @param id Unique identifier of the <code>Operator</code> object
+   * @param input The only input entity
+   * @param output The only output entity
+   */
+  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
+    this.id = id;
+    this.inputs.add(input);
+    this.outputs.add(output);
+  }
+
+  /**
+   * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and n outputs
+   *
+   * @param id Unique identifier of the <code>Operator</code> object
+   * @param inputs The list of input entities
+   * @param output The list of output entities
+   */
+  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) {
+    this.id = id;
+    this.inputs.addAll(inputs);
+    this.outputs.add(output);
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public List<EntityName> getInputNames() {
+    return this.inputs;
+  }
+
+  @Override
+  public List<EntityName> getOutputNames() {
+    return this.outputs;
+  }
+
+  /**
+   * Method to get the first output entity
+   *
+   * @return The first output entity name
+   */
+  public EntityName getOutputName() {
+    return this.outputs.get(0);
+  }
+
+  /**
+   * Method to get the first input entity
+   *
+   * @return The first input entity name
+   */
+  public EntityName getInputName() {
+    return this.inputs.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
new file mode 100644
index 0000000..986d688
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.partition;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This is an example build-in operator that performs a simple stream re-partition operation.
+ *
+ */
+public final class PartitionOp extends SimpleOperator implements TupleOperator {
+
+  /**
+   * The specification of this <code>PartitionOp</code>
+   *
+   */
+  private final PartitionSpec spec;
+
+  /**
+   * Ctor that takes the <code>PartitionSpec</code> object as input.
+   *
+   * @param spec The <code>PartitionSpec</code> object
+   */
+  public PartitionOp(PartitionSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * A simplified constructor that allow users to randomly create <code>PartitionOp</code>
+   *
+   * @param id The identifier of this operator
+   * @param input The input stream name of this operator
+   * @param system The output system name of this operator
+   * @param output The output stream name of this operator
+   * @param parKey The partition key used for the output stream
+   * @param parNum The number of partitions used for the output stream
+   */
+  public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) {
+    super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
+    this.spec = (PartitionSpec) super.getSpec();
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // TODO Auto-generated method stub
+    // No need to initialize store since all inputs are immediately send out
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    // TODO Auto-generated method stub
+    // NOOP or flush
+  }
+
+  @Override
+  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
+    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
+        tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
new file mode 100644
index 0000000..29d1784
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.partition;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * This class defines the specification class of <code>PartitionOp</code> operator
+ *
+ */
+public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
+
+  /**
+   * The partition key name
+   */
+  private final String parKey;
+
+  /**
+   * The number of partitions
+   */
+  private final int parNum;
+
+  /**
+   * The <code>SystemStream</code> to send the partition output to
+   */
+  private final SystemStream sysStream;
+
+  /**
+   * Ctor to create the <code>PartitionSpec</code>
+   *
+   * @param id The ID of the <code>PartitionOp</code>
+   * @param input The input stream name
+   * @param output The output <code>SystemStream</code> object
+   * @param parKey The name of the partition key
+   * @param parNum The number of partitions
+   */
+  public PartitionSpec(String id, String input, SystemStream output, String parKey, int parNum) {
+    super(id, EntityName.getStreamName(input), EntityName.getStreamName(output.getSystem() + ":" + output.getStream()));
+    this.parKey = parKey;
+    this.parNum = parNum;
+    this.sysStream = output;
+  }
+
+  /**
+   * Method to get the partition key name
+   *
+   * @return The partition key name
+   */
+  public String getParKey() {
+    return this.parKey;
+  }
+
+  /**
+   * Method to get the number of partitions
+   *
+   * @return The number of partitions
+   */
+  public int getParNum() {
+    return this.parNum;
+  }
+
+  /**
+   * Method to get the output <code>SystemStream</code>
+   *
+   * @return The output system stream object
+   */
+  public SystemStream getSystemStream() {
+    return this.sysStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
new file mode 100644
index 0000000..a8a6eaf
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.relation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for a join operator between two relations.
+ *
+ */
+public class Join extends SimpleOperator implements RelationOperator {
+
+  private final JoinSpec spec;
+
+  /**
+   * The input relations
+   *
+   */
+  private List<Relation> inputs = null;
+
+  /**
+   * The output relation
+   */
+  private Relation output = null;
+
+  /**
+   * Ctor that creates <code>Join</code> operator based on the specification.
+   *
+   * @param spec The <code>JoinSpec</code> object that specifies the join operator
+   */
+  public Join(JoinSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * An alternative ctor that allows users to create a join operator randomly.
+   *
+   * @param id The identifier of the join operator
+   * @param joinIns The list of input relation names of the join
+   * @param joinOut The output relation name of the join
+   * @param joinKeys The list of keys used in the join. Each entry in the <code>joinKeys</code> is the key name used in one of the input relations.
+   *     The order of the <code>joinKeys</code> MUST be the same as their corresponding relation names in <code>joinIns</code>
+   */
+  @SuppressWarnings("serial")
+  public Join(final String id, final List<String> joinIns, final String joinOut, final List<String> joinKeys) {
+    super(new JoinSpec(id, new ArrayList<EntityName>() {
+      {
+        for (String name : joinIns) {
+          add(EntityName.getRelationName(name));
+        }
+      }
+    }, EntityName.getRelationName(joinOut), joinKeys));
+    this.spec = (JoinSpec) this.getSpec();
+  }
+
+  private boolean hasPendingChanges() {
+    return getPendingChanges() != null;
+  }
+
+  private Relation getPendingChanges() {
+    // TODO Auto-generated method stub
+    // return any pending changes that have not been processed yet
+    return null;
+  }
+
+  private Relation getOutputChanges() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private boolean hasOutputChanges() {
+    // TODO Auto-generated method stub
+    return getOutputChanges() != null;
+  }
+
+  private void join(Relation deltaRelation) {
+    // TODO Auto-generated method stub
+    // implement the join logic
+    // 1. calculate the delta changes in <code>output</code>
+    // 2. check output condition to see whether the current input should trigger an output
+    // 3. set the output changes and pending changes
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    for (EntityName relation : this.spec.getInputNames()) {
+      inputs.add((Relation) context.getStore(relation.toString()));
+    }
+    this.output = (Relation) context.getStore(this.spec.getOutputName().toString());
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
+    if (hasPendingChanges()) {
+      sqlCollector.send(getPendingChanges());
+    }
+    sqlCollector.timeout(this.spec.getOutputNames());
+  }
+
+  @Override
+  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
+    // calculate join based on the input <code>deltaRelation</code>
+    join(deltaRelation);
+    if (hasOutputChanges()) {
+      collector.send(getOutputChanges());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
new file mode 100644
index 0000000..ba8bfb5
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.relation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class implements specification class for the build-in <code>Join</code> operator
+ */
+public class JoinSpec extends SimpleOperatorSpec implements OperatorSpec {
+  /**
+   * Join keys defined for each input relation
+   */
+  private final List<String> joinKeys = new ArrayList<String>();
+
+  /**
+   * Default ctor for the <code>JoinSpec</code>
+   *
+   * @param id Unique ID of the <code>Join</code> object
+   * @param joinIns The list of input relations
+   * @param joinOut The output relation
+   * @param joinKeys The list of join keys in input relations
+   */
+  public JoinSpec(String id, List<EntityName> joinIns, EntityName joinOut, List<String> joinKeys) {
+    super(id, joinIns, joinOut);
+    this.joinKeys.addAll(joinKeys);
+  }
+
+  /**
+   * Method to get the list of join keys
+   *
+   * @return The list of join keys
+   */
+  public List<String> getJoinKeys() {
+    return this.joinKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
new file mode 100644
index 0000000..7563100
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.stream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for an istream operator that converts a relation to a stream
+ *
+ */
+public class InsertStream extends SimpleOperator implements RelationOperator {
+  /**
+   * The <code>InsertStreamSpec</code> for this operator
+   */
+  private final InsertStreamSpec spec;
+
+  /**
+   * The time-varying relation that is to be converted into a stream
+   */
+  private Relation relation = null;
+
+  /**
+   * Ctor that takes the specication of the object as input parameter
+   *
+   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
+   *
+   * @param spec The <code>InsertStreamSpec</code> specification of this operator
+   */
+  public InsertStream(InsertStreamSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * An alternative ctor that allow users to create an <code>InsertStream</code> object randomly
+   *
+   * @param id The identifier of the <code>InsertStream</code> object
+   * @param input The input relation
+   * @param output The output stream
+   */
+  public InsertStream(String id, String input, String output) {
+    super(new InsertStreamSpec(id, EntityName.getRelationName(input), EntityName.getStreamName(output)));
+    this.spec = (InsertStreamSpec) super.getSpec();
+  }
+
+  @Override
+  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
+    KeyValueIterator<Object, Tuple> iterator = deltaRelation.all();
+    for (; iterator.hasNext();) {
+      Tuple tuple = iterator.next().getValue();
+      if (!tuple.isDelete()) {
+        collector.send(tuple);
+      }
+    }
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    if (this.relation == null) {
+      this.relation = (Relation) context.getStore(this.spec.getInputName().toString());
+    }
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    // TODO Auto-generated method stub
+    // assuming this operation does not have pending changes kept in memory
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
new file mode 100644
index 0000000..70475ce
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.stream;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * Example implementation of specification of <code>InsertStream</code> operator
+ */
+public class InsertStreamSpec extends SimpleOperatorSpec implements OperatorSpec {
+
+  /**
+   * Default ctor of <code>InsertStreamSpec</code>
+   *
+   * @param id The identifier of the operator
+   * @param input The input relation entity
+   * @param output The output stream entity
+   */
+  public InsertStreamSpec(String id, EntityName input, EntityName output) {
+    super(id, input, output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
new file mode 100644
index 0000000..935ffc0
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation
+ *
+ */
+public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
+
+  /**
+   * The specification of this window operator
+   */
+  private final WindowSpec spec;
+
+  /**
+   * The relation that the window operator keeps internally
+   */
+  private Relation relation = null;
+
+  /**
+   * The list of window states of all active windows the window operator keeps in track
+   */
+  private List<WindowState> windowStates = null;
+
+  /**
+   * Ctor that takes <code>WindowSpec</code> specification as input argument
+   *
+   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
+   *
+   * @param spec The window specification object
+   */
+  public BoundedTimeWindow(WindowSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * A simplified version of ctor that allows users to randomly created a window operator w/o spec object
+   *
+   * @param wndId The identifier of this window operator
+   * @param lengthSec The window size in seconds
+   * @param input The input stream name
+   * @param output The output relation name
+   */
+  public BoundedTimeWindow(String wndId, int lengthSec, String input, String output) {
+    super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getRelationName(output), lengthSec));
+    this.spec = (WindowSpec) super.getSpec();
+  }
+
+  @Override
+  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
+    // for each tuple, this will evaluate the incoming tuple and update the window states.
+    // If the window states allow generating output, calculate the delta changes in
+    // the window relation and execute the relation operation <code>nextOp</code>
+    updateWindow(tuple);
+    processWindowChanges(collector);
+  }
+
+  private void processWindowChanges(SqlMessageCollector collector) throws Exception {
+    if (windowStateChange()) {
+      collector.send(getWindowChanges());
+    }
+  }
+
+  private Relation getWindowChanges() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private boolean windowStateChange() {
+    // TODO Auto-generated method stub
+    return getWindowChanges() != null;
+  }
+
+  private void updateWindow(Tuple tuple) {
+    // TODO Auto-generated method stub
+    // The window states are updated here
+    // And the correpsonding deltaChanges is also calculated here.
+  }
+
+  private void updateWindowTimeout() {
+    // TODO Auto-generated method stub
+    // The window states are updated here
+    // And the correpsonding deltaChanges is also calculated here.
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
+    updateWindowTimeout();
+    processWindowChanges(sqlCollector);
+    sqlCollector.timeout(this.spec.getOutputNames());
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // TODO Auto-generated method stub
+    if (this.relation == null) {
+      this.relation = (Relation) context.getStore(this.spec.getOutputName().toString());
+      Relation wndStates = (Relation) context.getStore(this.spec.getWndStatesName());
+      this.windowStates = new ArrayList<WindowState>();
+      for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); iter.hasNext();) {
+        this.windowStates.add((WindowState) iter.next().getValue().getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
new file mode 100644
index 0000000..e2ae3aa
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class implements the specification class for the build-in <code>BoundedTimeWindow</code> operator
+ */
+public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
+
+  /**
+   * The window size in seconds
+   */
+  private final int wndSizeSec;
+
+  /**
+   * Default ctor of the <code>WindowSpec</code> object
+   *
+   * @param id The identifier of the operator
+   * @param input The input stream entity
+   * @param output The output relation entity
+   * @param lengthSec The window size in seconds
+   */
+  public WindowSpec(String id, EntityName input, EntityName output, int lengthSec) {
+    super(id, input, output);
+    this.wndSizeSec = lengthSec;
+  }
+
+  /**
+   * Method to get the window state relation name
+   *
+   * @return The window state relation name
+   */
+  public String getWndStatesName() {
+    return this.getId() + "-wnd-state";
+  }
+
+  /**
+   * Method to get the window size in seconds
+   *
+   * @return The window size in seconds
+   */
+  public int getWndSizeSec() {
+    return this.wndSizeSec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
new file mode 100644
index 0000000..48547f0
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+public class WindowState {
+  public String startOffset = null;
+  public String endOffset = null;
+  public boolean isClosed = false;
+
+  public void open(String offset) {
+    this.isClosed = false;
+    this.startOffset = offset;
+  }
+
+  public void close(String offset) {
+    this.endOffset = offset;
+    this.isClosed = true;
+  }
+
+  public void advanceTo(String offset) {
+    this.endOffset = offset;
+  }
+
+  public boolean isClosed() {
+    return this.isClosed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
new file mode 100644
index 0000000..1dfb262
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.rules.*;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.List;
+
+/**
+ * Streaming query planner implementation based on Calcite.
+ */
+public class QueryPlanner {
+  public static final boolean COMMUTE =
+      "true".equals(
+          System.getProperties().getProperty("calcite.enable.join.commute"));
+
+  /**
+   * Whether to enable the collation trait. Some extra optimizations are
+   * possible if enabled, but queries should work either way. At some point
+   * this will become a preference, or we will run multiple phases: first
+   * disabled, then enabled.
+   */
+  private static final boolean ENABLE_COLLATION_TRAIT = true;
+
+  private static final List<RelOptRule> DEFAULT_RULES =
+      ImmutableList.of(
+          AggregateStarTableRule.INSTANCE,
+          AggregateStarTableRule.INSTANCE2,
+          TableScanRule.INSTANCE,
+          COMMUTE
+              ? JoinAssociateRule.INSTANCE
+              : ProjectMergeRule.INSTANCE,
+          FilterTableScanRule.INSTANCE,
+          ProjectFilterTransposeRule.INSTANCE,
+          FilterProjectTransposeRule.INSTANCE,
+          FilterJoinRule.FILTER_ON_JOIN,
+          AggregateExpandDistinctAggregatesRule.INSTANCE,
+          AggregateReduceFunctionsRule.INSTANCE,
+          FilterAggregateTransposeRule.INSTANCE,
+          JoinCommuteRule.INSTANCE,
+          JoinPushThroughJoinRule.RIGHT,
+          JoinPushThroughJoinRule.LEFT,
+          SortProjectTransposeRule.INSTANCE);
+
+  private static final List<RelOptRule> ENUMERABLE_RULES =
+      ImmutableList.of(
+          EnumerableRules.ENUMERABLE_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
+          EnumerableRules.ENUMERABLE_PROJECT_RULE,
+          EnumerableRules.ENUMERABLE_FILTER_RULE,
+          EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
+          EnumerableRules.ENUMERABLE_SORT_RULE,
+          EnumerableRules.ENUMERABLE_LIMIT_RULE,
+          EnumerableRules.ENUMERABLE_COLLECT_RULE,
+          EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
+          EnumerableRules.ENUMERABLE_UNION_RULE,
+          EnumerableRules.ENUMERABLE_INTERSECT_RULE,
+          EnumerableRules.ENUMERABLE_MINUS_RULE,
+          EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
+          EnumerableRules.ENUMERABLE_VALUES_RULE,
+          EnumerableRules.ENUMERABLE_WINDOW_RULE,
+          EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
+
+  private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
+      ImmutableList.of(
+          ReduceExpressionsRule.PROJECT_INSTANCE,
+          ReduceExpressionsRule.FILTER_INSTANCE,
+          ReduceExpressionsRule.CALC_INSTANCE,
+          ReduceExpressionsRule.JOIN_INSTANCE,
+          ValuesReduceRule.FILTER_INSTANCE,
+          ValuesReduceRule.PROJECT_FILTER_INSTANCE,
+          ValuesReduceRule.PROJECT_INSTANCE);
+
+  /**
+   * Transform streaming query to a query plan.
+   * @param query streaming query in SQL with streaming extensions
+   * @param context query prepare context
+   * @return query plan
+   */
+  public RelNode getPlan(String query, CalcitePrepare.Context context) {
+    final JavaTypeFactory typeFactory = context.getTypeFactory();
+    final CalciteConnectionConfig config = context.config();
+
+    CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(),
+        false,
+        context.getDefaultSchemaPath(),
+        typeFactory);
+
+    SqlParser sqlParser = SqlParser.create(query,
+        SqlParser.configBuilder()
+            .setQuotedCasing(config.quotedCasing())
+            .setUnquotedCasing(config.unquotedCasing())
+            .setQuoting(config.quoting())
+            .build());
+
+    SqlNode sqlNode;
+
+    try {
+      sqlNode = sqlParser.parseStmt();
+    } catch (SqlParseException e) {
+      throw new RuntimeException("parse failed: " + e.getMessage(), e);
+    }
+
+    final ChainedSqlOperatorTable operatorTable =
+        new ChainedSqlOperatorTable(
+            ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader));
+
+    final SqlValidator validator =
+        new SamzaSqlValidator(operatorTable, catalogReader, typeFactory);
+    validator.setIdentifierExpansion(true);
+
+    SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+    final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null);
+
+    final SamzaQueryPreparingStatement preparingStmt =
+        new SamzaQueryPreparingStatement(
+            context,
+            catalogReader,
+            typeFactory,
+            context.getRootSchema(),
+            EnumerableRel.Prefer.ARRAY,
+            planner,
+            EnumerableConvention.INSTANCE);
+
+    /* TODO: Add query optimization. */
+
+    return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true);
+  }
+
+  /**
+   * Creates a query planner and initializes it with a default set of
+   * rules.
+   *
+   * @param prepareContext  context for preparing a statement
+   * @param externalContext external query planning context
+   * @param costFactory     cost factory for cost based query planning
+   * @return relation query planner instance
+   */
+  protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext,
+                                                       org.apache.calcite.plan.Context externalContext,
+                                                       RelOptCostFactory costFactory) {
+    if (externalContext == null) {
+      externalContext = Contexts.withConfig(prepareContext.config());
+    }
+
+    final VolcanoPlanner planner =
+        new VolcanoPlanner(costFactory, externalContext);
+
+    planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+
+    if (ENABLE_COLLATION_TRAIT) {
+      planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
+      planner.registerAbstractRelationalRules();
+    }
+    RelOptUtil.registerAbstractRels(planner);
+    for (RelOptRule rule : DEFAULT_RULES) {
+      planner.addRule(rule);
+    }
+
+    /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */
+
+    for (RelOptRule rule : ENUMERABLE_RULES) {
+      planner.addRule(rule);
+    }
+
+    for (RelOptRule rule : StreamRules.RULES) {
+      planner.addRule(rule);
+    }
+
+    /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */
+
+    return planner;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
new file mode 100644
index 0000000..63b1da5
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteRootSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables
+ * re-use of Calcite code.
+ */
+public class SamzaCalciteConnection implements CalciteConnection {
+  private static final String INLINE = "inline:";
+  private final JavaTypeFactory typeFactory;
+  private final CalciteRootSchema rootSchema;
+  private String schema;
+
+  public SamzaCalciteConnection(String model) throws IOException {
+    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    rootSchema = CalciteSchema.createRootSchema(true);
+    new ModelHandler(this, INLINE + model);
+  }
+
+  public CalciteRootSchema getCalciteRootSchema(){
+    return rootSchema;
+  }
+
+  @Override
+  public SchemaPlus getRootSchema() {
+    return rootSchema.plus();
+  }
+
+  @Override
+  public JavaTypeFactory getTypeFactory() {
+    return typeFactory;
+  }
+
+  @Override
+  public Properties getProperties() {
+    return null;
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public String nativeSQL(String sql) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+  }
+
+  @Override
+  public boolean getAutoCommit() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void commit() throws SQLException {
+
+  }
+
+  @Override
+  public void rollback() throws SQLException {
+
+  }
+
+  @Override
+  public void close() throws SQLException {
+
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {
+
+  }
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {
+
+  }
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {
+
+  }
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+  }
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {
+
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {
+
+  }
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+  }
+
+  @Override
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+  }
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public void setSchema(String schema) throws SQLException {
+    this.schema = schema;
+  }
+
+  @Override
+  public String getSchema() throws SQLException {
+    return schema;
+  }
+
+  public void abort(Executor executor) throws SQLException {
+
+  }
+
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+  }
+
+  public int getNetworkTimeout() throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public CalciteConnectionConfig config() {
+    return new CalciteConnectionConfigImpl(new Properties());
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
+    return null;
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
+    return null;
+  }
+
+  @Override
+  public <T> T execute(Expression expression, Class<T> type) {
+    return null;
+  }
+
+  @Override
+  public <T> T execute(Expression expression, Type type) {
+    return null;
+  }
+
+  @Override
+  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+    return null;
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
new file mode 100644
index 0000000..0721573
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.planner;
+
+import com.google.common.collect.Maps;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+
+import java.util.List;
+import java.util.Map;
+
+public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander {
+  private final RelOptPlanner planner;
+  private final RexBuilder rexBuilder;
+  protected final CalciteSchema schema;
+  protected final RelDataTypeFactory typeFactory;
+  private final EnumerableRel.Prefer prefer;
+  private final Map<String, Object> internalParameters =
+      Maps.newLinkedHashMap();
+  private int expansionDepth;
+  private SqlValidator sqlValidator;
+
+  public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader,
+                                      RelDataTypeFactory typeFactory,
+                                      CalciteSchema schema,
+                                      EnumerableRel.Prefer prefer,
+                                      RelOptPlanner planner,
+                                      Convention resultConvention) {
+    super(context, catalogReader, resultConvention);
+    this.schema = schema;
+    this.prefer = prefer;
+    this.planner = planner;
+    this.typeFactory = typeFactory;
+    this.rexBuilder = new RexBuilder(typeFactory);
+  }
+
+  @Override
+  protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) {
+    return null;
+  }
+
+  @Override
+  protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) {
+    return null;
+  }
+
+  @Override
+  protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) {
+    SqlToRelConverter sqlToRelConverter =
+        new SqlToRelConverter(
+            this, validator, catalogReader, planner, rexBuilder,
+            StandardConvertletTable.INSTANCE);
+    sqlToRelConverter.setTrimUnusedFields(true);
+    return sqlToRelConverter;
+  }
+
+  @Override
+  public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+    return null;
+  }
+
+  @Override
+  protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) {
+    return null;
+  }
+
+  @Override
+  protected void init(Class runtimeContextClass) {}
+
+  @Override
+  protected SqlValidator getSqlValidator() {
+    return null;
+  }
+
+  @Override
+  public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+    // TODO: Implement custom view expansions
+    return super.expandView(rowType, queryString, schemaPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
new file mode 100644
index 0000000..f46c1f0
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+public class SamzaSqlValidator extends SqlValidatorImpl{
+  /**
+   * Creates a validator.
+   *
+   * @param opTab         Operator table
+   * @param catalogReader Catalog reader
+   * @param typeFactory   Type factory
+   */
+  protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
+    /* Note: We may need to define Samza specific SqlConformance instance in future. */
+    super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT);
+  }
+
+  @Override
+  protected RelDataType getLogicalSourceRowType(
+      RelDataType sourceRowType, SqlInsert insert) {
+    return ((JavaTypeFactory) typeFactory).toSql(sourceRowType);
+  }
+
+  @Override
+  protected RelDataType getLogicalTargetRowType(
+      RelDataType targetRowType, SqlInsert insert) {
+    return ((JavaTypeFactory) typeFactory).toSql(targetRowType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
new file mode 100644
index 0000000..c6fc673
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.router;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.router.OperatorRouter;
+
+
+/**
+ * Example implementation of <code>OperatorRouter</code>
+ *
+ */
+public class SimpleRouter implements OperatorRouter {
+  /**
+   * List of operators added to the <code>OperatorRouter</code>
+   */
+  private List<Operator> operators = new ArrayList<Operator>();
+
+  @SuppressWarnings("rawtypes")
+  /**
+   * Map of <code>EntityName</code> to the list of operators associated with it
+   */
+  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
+
+  /**
+   * List of <code>EntityName</code> as system inputs
+   */
+  private List<EntityName> inputEntities = new ArrayList<EntityName>();
+
+  @SuppressWarnings("unchecked")
+  private void addOperator(EntityName output, Operator nextOp) {
+    if (nextOps.get(output) == null) {
+      nextOps.put(output, new ArrayList<Operator>());
+    }
+    nextOps.get(output).add(nextOp);
+    operators.add(nextOp);
+
+  }
+
+  @Override
+  public Iterator<Operator> iterator() {
+    return operators.iterator();
+  }
+
+  @Override
+  public void addTupleOperator(EntityName outputStream, TupleOperator nextOp) throws Exception {
+    if (!outputStream.isStream()) {
+      throw new IllegalArgumentException("Can't attach an TupleOperator " + nextOp.getSpec().getId()
+          + " to a non-stream entity " + outputStream);
+    }
+    addOperator(outputStream, nextOp);
+  }
+
+  @Override
+  public void addRelationOperator(EntityName outputRelation, RelationOperator nextOp) throws Exception {
+    if (!outputRelation.isRelation()) {
+      throw new IllegalArgumentException("Can't attach an RelationOperator " + nextOp.getSpec().getId()
+          + " to a non-relation entity " + outputRelation);
+    }
+    addOperator(outputRelation, nextOp);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<RelationOperator> getRelationOperators(EntityName outputRelation) {
+    if (!outputRelation.isRelation()) {
+      throw new IllegalArgumentException("Can't get RelationOperators for a non-relation output: " + outputRelation);
+    }
+    return nextOps.get(outputRelation);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<TupleOperator> getTupleOperators(EntityName outputStream) {
+    if (!outputStream.isStream()) {
+      throw new IllegalArgumentException("Can't get TupleOperators for a non-stream output: " + outputStream);
+    }
+    return nextOps.get(outputStream);
+  }
+
+  @Override
+  public boolean hasNextOperators(EntityName output) {
+    return nextOps.get(output) != null && !nextOps.get(output).isEmpty();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<Operator> getNextOperators(EntityName output) {
+    return nextOps.get(output);
+  }
+
+  @Override
+  public void addSystemInput(EntityName input) {
+    if (!nextOps.containsKey(input) || nextOps.get(input).isEmpty()) {
+      throw new IllegalStateException("Can't set a system input w/o any next operators. input:" + input);
+    }
+    if (!inputEntities.contains(input)) {
+      inputEntities.add(input);
+    }
+  }
+
+  @Override
+  public List<EntityName> getSystemInputs() {
+    return this.inputEntities;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
new file mode 100644
index 0000000..1e5310f
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.router.OperatorRouter;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Example implementation of a <code>SqlMessageCollector</code> that uses <code>OperatorRouter</code>
+ *
+ */
+public class OperatorMessageCollector implements SqlMessageCollector {
+
+  private final MessageCollector collector;
+  private final TaskCoordinator coordinator;
+  private final OperatorRouter rteCntx;
+
+  public OperatorMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) {
+    this.collector = collector;
+    this.coordinator = coordinator;
+    this.rteCntx = rteCntx;
+  }
+
+  @Override
+  public void send(Relation deltaRelation) throws Exception {
+    for (RelationOperator op : this.rteCntx.getRelationOperators(deltaRelation.getName())) {
+      op.process(deltaRelation, this);
+    }
+  }
+
+  @Override
+  public void send(Tuple tuple) throws Exception {
+    for (TupleOperator op : this.rteCntx.getTupleOperators(tuple.getStreamName())) {
+      op.process(tuple, this);
+    }
+  }
+
+  @Override
+  public void timeout(List<EntityName> outputs) throws Exception {
+    for (EntityName output : outputs) {
+      for (Operator op : this.rteCntx.getNextOperators(output)) {
+        op.window(this, this.coordinator);
+      }
+    }
+  }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    this.collector.send(envelope);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
new file mode 100644
index 0000000..b98e2d7
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+
+
+/**
+ * This class defines the interface class to be used by the operators to send their output via runtime system resources,
+ * s.t. the output system streams, the system storage, or <code>OperatorRouter</code>.
+ *
+ */
+public interface SqlMessageCollector extends MessageCollector {
+
+  /**
+   * This method allows the current operator send its relation output to next
+   *
+   * @param deltaRelation The delta <code>Relation</code> output generated by the current operator
+   * @throws Exception Throws exception if failed
+   */
+  void send(Relation deltaRelation) throws Exception;
+
+  /**
+   * This method allows the current operator send its tuple output to next
+   *
+   * @param tuple The <code>Tuple</code> object generated by the current operator
+   * @throws Exception Throws exception if failed
+   */
+  void send(Tuple tuple) throws Exception;
+
+  /**
+   * This method allows the current operator triggers timeout actions via the <code>SqlMessageCollector</code>.
+   *
+   * <p>This method sets timeout events to the corresponding <code>outputEntities</code> s.t. the next operators
+   * attached to those entities will be notified of the timeout.
+   *
+   * @param outputEntities The list of output entities via which the timeout event needs to be sent to
+   * @throws Exception Throws exception if failed
+   */
+  void timeout(List<EntityName> outputEntities) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
new file mode 100644
index 0000000..b4b0e59
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+
+
+/**
+ * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators
+ *
+ */
+public class StoreMessageCollector implements SqlMessageCollector {
+
+  private final KeyValueStore<EntityName, List<Object>> outputStore;
+
+  public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
+    this.outputStore = store;
+  }
+
+  @Override
+  public void send(Relation deltaRelation) throws Exception {
+    saveOutput(deltaRelation.getName(), deltaRelation);
+  }
+
+  @Override
+  public void send(Tuple tuple) throws Exception {
+    saveOutput(tuple.getStreamName(), tuple);
+  }
+
+  @Override
+  public void timeout(List<EntityName> outputs) throws Exception {
+    // TODO Auto-generated method stub
+  }
+
+  public List<Object> removeOutput(EntityName id) {
+    List<Object> output = outputStore.get(id);
+    outputStore.delete(id);
+    return output;
+  }
+
+  private void saveOutput(EntityName name, Object output) {
+    if (this.outputStore.get(name) == null) {
+      this.outputStore.put(name, new ArrayList<Object>());
+    }
+    List<Object> outputs = this.outputStore.get(name);
+    outputs.add(output);
+  }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    saveOutput(
+        EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()),
+        envelope);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java b/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
new file mode 100644
index 0000000..7412669
--- /dev/null
+++ b/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.avro.AvroData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SqlAvroSerdeTest {
+  public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.sql\",\n"+
+      " \"type\": \"record\",\n"+
+      " \"name\": \"Order\",\n"+
+      " \"fields\": [\n"+
+      "     {\"name\": \"id\", \"type\": \"int\"},\n"+
+      "     {\"name\": \"product\",  \"type\": \"string\"},\n"+
+      "     {\"name\": \"quantity\", \"type\": \"int\"}\n"+
+      " ]\n"+
+      "}";
+
+  public static Schema orderSchema = new Schema.Parser().parse(ORDER_SCHEMA);
+  private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig());
+
+  @Test
+  public void testSqlAvroSerdeDeserialization() throws IOException {
+    AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+
+    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRING);
+  }
+
+  @Test
+  public void testSqlAvroSerialization() throws IOException {
+    AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+    byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
+
+    AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
+
+    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRING);
+  }
+
+  private static Config sqlAvroSerdeTestConfig(){
+    Map<String, String> config = new HashMap<String, String>();
+    config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
+
+    return new MapConfig(config);
+  }
+
+  private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException {
+    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);
+    writer.write(datum, encoder);
+    encoder.flush();
+
+    return  output.toByteArray();
+  }
+
+  private static GenericRecord sampleOrderRecord(){
+    GenericData.Record datum = new GenericData.Record(orderSchema);
+    datum.put("id", 1);
+    datum.put("product", "paint");
+    datum.put("quantity", 3);
+
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f8309b2a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
new file mode 100644
index 0000000..022116e
--- /dev/null
+++ b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.util.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryPlannerTest {
+  public static final String STREAM_SCHEMA = "     {\n"
+      + "       name: 'STREAMS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + SamzaStreamTableFactory.class.getName() + "'\n"
+      + "       } ]\n"
+      + "     }\n";
+
+  public static final String STREAM_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'STREAMS',\n"
+      + "   schemas: [\n"
+      + STREAM_SCHEMA
+      + "   ]\n"
+      + "}";
+
+  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED =
+      "LogicalDelta\n" +
+          "  LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" +
+          "    LogicalFilter(condition=[>($2, 5)])\n" +
+          "      EnumerableTableScan(table=[[STREAMS, ORDERS]])";
+  public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE =
+      "select stream * from orders where quantity > 5";
+
+  @Test
+  public void testQueryPlanner() throws IOException, SQLException {
+
+    SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL);
+    CalcitePrepare.Context context = Schemas.makeContext(connection,
+        connection.getCalciteRootSchema(),
+        ImmutableList.of(connection.getSchema()),
+        ImmutableMap.copyOf(defaultConfiguration()));
+
+    QueryPlanner planner = new QueryPlanner();
+    RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context);
+    Assert.assertNotNull(relNode);
+    String s = Util.toLinux(RelOptUtil.toString(relNode));
+    Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED));
+  }
+
+  public static Map<CalciteConnectionProperty, String> defaultConfiguration(){
+    Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>();
+
+    map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false");
+    map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name());
+    map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name());
+    map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name());
+
+    return map;
+  }
+}