You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:08 UTC

[05/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
new file mode 100644
index 0000000..437877c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+public class StormRelDataTypeSystem extends RelDataTypeSystemImpl {
+    public static final RelDataTypeSystem STORM_REL_DATATYPE_SYSTEM = new StormRelDataTypeSystem();
+
+    @Override
+    public int getMaxNumericScale() {
+        return 38;
+    }
+
+    @Override
+    public int getMaxNumericPrecision() {
+        return 38;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
new file mode 100644
index 0000000..40bbacd
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StormRelUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(StormRelUtils.class);
+
+    private static final AtomicInteger sequence = new AtomicInteger(0);
+    private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+    public static String getStageName(TridentRel relNode) {
+        return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
+    }
+
+    public static String getClassName(TridentRel relNode) {
+        return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
+                classSequence.getAndIncrement();
+    }
+
+    public static TridentRel getStormRelInput(RelNode input) {
+        if (input instanceof RelSubset) {
+            // go with known best input
+            input = ((RelSubset) input).getBest();
+        }
+        return (TridentRel) input;
+    }
+
+    public static String explain(final RelNode rel) {
+        return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+    }
+
+    public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+        String explain = "";
+        try {
+            explain = RelOptUtil.toString(rel);
+        } catch (StackOverflowError e) {
+            LOG.error("StackOverflowError occurred while extracting plan. Please report it to the dev@ mailing list.");
+            LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+            LOG.error("Forcing plan to empty string and continue... SQL Runner may not working properly after.");
+        }
+        return explain;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
new file mode 100644
index 0000000..258fe72
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.sql.util.SqlShuttle;
+
+public class UnsupportedOperatorsVisitor extends SqlShuttle {
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
new file mode 100644
index 0000000..29ca08b
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rex.RexProgram;
+
+public abstract class StormCalcRelBase extends Calc implements StormRelNode {
+    protected StormCalcRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+        super(cluster, traits, child, program);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
new file mode 100644
index 0000000..fa2ac65
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+
+public abstract class StormFilterRelBase extends Filter implements StormRelNode {
+    protected StormFilterRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+        super(cluster, traits, child, condition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
new file mode 100644
index 0000000..d8e82c5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Set;
+
+public abstract class StormJoinRelBase extends Join implements StormRelNode {
+    protected StormJoinRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+        super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
new file mode 100644
index 0000000..fe32ba5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+public abstract class StormProjectRelBase extends Project implements StormRelNode {
+    protected StormProjectRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+        super(cluster, traits, input, projects, rowType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
new file mode 100644
index 0000000..9327868
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.rel.RelNode;
+
+public interface StormRelNode extends RelNode {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
new file mode 100644
index 0000000..59694fc
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+public abstract class StormStreamInsertRelBase extends TableModify implements StormRelNode {
+    protected StormStreamInsertRelBase(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
new file mode 100644
index 0000000..32f1ac2
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+
+public abstract class StormStreamScanRelBase extends TableScan implements StormRelNode {
+
+    // FIXME: define Table class and table.unwrap() to get it
+
+    protected StormStreamScanRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+        super(cluster, traitSet, table);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
new file mode 100644
index 0000000..f98fb02
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
@@ -0,0 +1,156 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.planner.trident;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.planner.StormRelDataTypeSystem;
+import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.AbstractTridentProcessor;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class QueryPlanner {
+
+    public static final int STORM_REL_CONVERSION_RULES = 1;
+
+    private final Planner planner;
+
+    private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+            RelDataTypeSystem.DEFAULT);
+
+    public QueryPlanner(SchemaPlus schema) {
+        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+        traitDefs.add(ConventionTraitDef.INSTANCE);
+        traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                false,
+                Collections.<String>emptyList(), typeFactory));
+
+        FrameworkConfig config = Frameworks.newConfigBuilder()
+                .defaultSchema(schema)
+                .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+                .traitDefs(traitDefs)
+                .context(Contexts.EMPTY_CONTEXT)
+                .ruleSets(TridentStormRuleSets.getRuleSets())
+                .costFactory(null)
+                .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
+                .build();
+        this.planner = Frameworks.getPlanner(config);
+    }
+
+    public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
+        TridentRel relNode = getPlan(query);
+
+        TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
+        relNode.tridentPlan(tridentPlanCreator);
+
+        final TridentTopology topology = tridentPlanCreator.getTopology();
+        final IAggregatableStream lastStream = tridentPlanCreator.pop();
+        final DataContext dc = tridentPlanCreator.getDataContext();
+        final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
+
+        return new AbstractTridentProcessor() {
+            @Override
+            public TridentTopology build() {
+                return topology;
+            }
+
+            @Override
+            public Stream outputStream() {
+                return lastStream.toStream();
+            }
+
+            @Override
+            public DataContext getDataContext() {
+                return dc;
+            }
+
+            @Override
+            public List<CompilingClassLoader> getClassLoaders() {
+                return cls;
+            }
+        };
+    }
+
+    public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
+        return (TridentRel) validateAndConvert(planner.parse(query));
+    }
+
+    private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
+        SqlNode validated = validateNode(sqlNode);
+        RelNode relNode = convertToRelNode(validated);
+        return convertToStormRel(relNode);
+    }
+
+    private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
+        RelTraitSet traitSet = relNode.getTraitSet();
+        traitSet = traitSet.simplify();
+
+        // PlannerImpl.transform() optimizes RelNode with ruleset
+        return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
+    }
+
+    private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+        return planner.rel(sqlNode).rel;
+    }
+
+    private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+        SqlNode validatedSqlNode = planner.validate(sqlNode);
+        validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
+        return validatedSqlNode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
new file mode 100644
index 0000000..30ebf7e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+
+public class TridentPlanCreator {
+    private final Map<String, ISqlTridentDataSource> sources;
+    private final JavaTypeFactory typeFactory;
+    private final RexNodeToJavaCodeCompiler rexCompiler;
+    private final DataContext dataContext;
+    private final TridentTopology topology;
+
+    private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
+    private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
+
+    public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
+        this.sources = sources;
+        this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
+        this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
+
+        this.topology = new TridentTopology();
+        this.dataContext = new StormDataContext();
+    }
+
+    public void addStream(IAggregatableStream stream) throws Exception {
+        push(stream);
+    }
+
+    public IAggregatableStream pop() {
+        return streamStack.pop();
+    }
+
+    public Map<String, ISqlTridentDataSource> getSources() {
+        return sources;
+    }
+
+    public DataContext getDataContext() {
+        return dataContext;
+    }
+
+    public JavaTypeFactory getTypeFactory() {
+        return typeFactory;
+    }
+
+    public TridentTopology getTopology() {
+        return topology;
+    }
+
+    public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
+            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(nodes, inputRowType, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+                getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
+    }
+
+    public ExecutableExpression createScalarInstance(RexProgram program, String className)
+            throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(program, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+                getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
+    }
+
+    private void push(IAggregatableStream stream) {
+        streamStack.push(stream);
+    }
+
+    public void addClassLoader(CompilingClassLoader compilingClassLoader) {
+        this.classLoaders.add(compilingClassLoader);
+    }
+
+    public ClassLoader getLastClassLoader() {
+        if (this.classLoaders.size() > 0) {
+            return this.classLoaders.get(this.classLoaders.size() - 1);
+        } else {
+            return this.getClass().getClassLoader();
+        }
+    }
+
+    public List<CompilingClassLoader> getClassLoaders() {
+        return classLoaders;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
new file mode 100644
index 0000000..e146069
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
@@ -0,0 +1,110 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.planner.trident;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionEliminatorRule;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
+import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
+import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
+import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
+import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
+import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
+import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
+
+import java.util.Iterator;
+
+public class TridentStormRuleSets {
+    private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
+            ImmutableSet.<RelOptRule>builder().add(
+                    SortRemoveRule.INSTANCE,
+
+                    FilterToCalcRule.INSTANCE,
+                    ProjectToCalcRule.INSTANCE,
+                    FilterCalcMergeRule.INSTANCE,
+                    ProjectCalcMergeRule.INSTANCE,
+                    CalcMergeRule.INSTANCE,
+
+                    PruneEmptyRules.FILTER_INSTANCE,
+                    PruneEmptyRules.PROJECT_INSTANCE,
+                    PruneEmptyRules.UNION_INSTANCE,
+
+                    ProjectFilterTransposeRule.INSTANCE,
+                    FilterProjectTransposeRule.INSTANCE,
+                    ProjectRemoveRule.INSTANCE,
+
+                    ReduceExpressionsRule.FILTER_INSTANCE,
+                    ReduceExpressionsRule.PROJECT_INSTANCE,
+                    ReduceExpressionsRule.CALC_INSTANCE,
+
+                    // merge and push unions rules
+                    UnionEliminatorRule.INSTANCE,
+
+                    TridentScanRule.INSTANCE,
+                    TridentFilterRule.INSTANCE,
+                    TridentProjectRule.INSTANCE,
+                    TridentAggregateRule.INSTANCE,
+                    TridentJoinRule.INSTANCE,
+                    TridentModifyRule.INSTANCE,
+                    TridentCalcRule.INSTANCE
+            ).build();
+
+    public static RuleSet[] getRuleSets() {
+        return new RuleSet[]{
+                new StormRuleSet(StreamRules.RULES),
+                new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
+        };
+    }
+
+    private static class StormRuleSet implements RuleSet {
+        final ImmutableSet<RelOptRule> rules;
+
+        public StormRuleSet(ImmutableSet<RelOptRule> rules) {
+            this.rules = rules;
+        }
+
+        public StormRuleSet(ImmutableList<RelOptRule> rules) {
+            this.rules = ImmutableSet.<RelOptRule>builder()
+                    .addAll(rules)
+                    .build();
+        }
+
+        @Override
+        public Iterator<RelOptRule> iterator() {
+            return rules.iterator();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
new file mode 100644
index 0000000..482e841
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormCalcRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
+    public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+        super(cluster, traits, child, program);
+    }
+
+    @Override
+    public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
+        return new TridentCalcRel(getCluster(), traitSet, child, program);
+    }
+
+    @Override
+    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+        Stream inputStream = planCreator.pop().toStream();
+
+        String stageName = StormRelUtils.getStageName(this);
+
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        List<String> outputFieldNames = getRowType().getFieldNames();
+        int outputCount = outputFieldNames.size();
+
+        // filter
+        ExecutableExpression filterInstance = null;
+        RexLocalRef condition = program.getCondition();
+        if (condition != null) {
+            RexNode conditionNode = program.expandLocalRef(condition);
+            filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
+                    StormRelUtils.getClassName(this));
+        }
+
+        // projection
+        ExecutableExpression projectionInstance = null;
+        List<RexLocalRef> projectList = program.getProjectList();
+        if (projectList != null && !projectList.isEmpty()) {
+            List<RexNode> expandedNodes = new ArrayList<>();
+            for (RexLocalRef project : projectList) {
+                expandedNodes.add(program.expandLocalRef(project));
+            }
+
+            projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
+                    StormRelUtils.getClassName(this));
+        }
+
+        if (projectionInstance == null && filterInstance == null) {
+            // it shouldn't be happen
+            throw new IllegalStateException("Either projection or condition, or both should be provided.");
+        }
+
+        final Stream finalStream = inputStream
+                .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+                .name(stageName);
+
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
new file mode 100644
index 0000000..1fe0927
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormFilterRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.List;
+
+public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
+    public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+        super(cluster, traits, child, condition);
+    }
+
+    @Override
+    public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+        return new TridentFilterRel(getCluster(), traitSet, input, condition);
+    }
+
+    @Override
+    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+        Stream inputStream = planCreator.pop().toStream();
+
+        String stageName = StormRelUtils.getStageName(this);
+
+        List<RexNode> childExps = getChildExps();
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        String filterClassName = StormRelUtils.getClassName(this);
+        ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
+
+        IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
+                .name(stageName);
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
new file mode 100644
index 0000000..d221498
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+public enum TridentLogicalConvention implements Convention {
+  INSTANCE;
+
+  @Override
+  public Class getInterface() {
+    return TridentRel.class;
+  }
+
+  @Override
+  public String getName() {
+    return "STORM_LOGICAL";
+  }
+
+  @Override
+  public RelTraitDef getTraitDef() {
+    return ConventionTraitDef.INSTANCE;
+  }
+
+  @Override
+  public boolean satisfies(RelTrait trait) {
+    return this == trait;
+  }
+
+  @Override
+  public void register(RelOptPlanner planner) {}
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  @Override
+  public boolean canConvertConvention(Convention toConvention) {
+    return false;
+  }
+
+  @Override
+  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
new file mode 100644
index 0000000..06be5d7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormProjectRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
+    public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+        super(cluster, traits, input, projects, rowType);
+    }
+
+    @Override
+    public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+        return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
+    }
+
+    @Override
+    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+        Stream inputStream = planCreator.pop().toStream();
+
+        String stageName = StormRelUtils.getStageName(this);
+        String projectionClassName = StormRelUtils.getClassName(this);
+
+        List<String> outputFieldNames = getRowType().getFieldNames();
+        int outputCount = outputFieldNames.size();
+
+        List<RexNode> childExps = getChildExps();
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
+        Stream finalStream = inputStream
+                .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+                .name(stageName);
+
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
new file mode 100644
index 0000000..8b8e949
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.storm.sql.planner.rel.StormRelNode;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+
+public interface TridentRel extends StormRelNode {
+    void tridentPlan(TridentPlanCreator planCreator) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
new file mode 100644
index 0000000..e92c29b
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
+    public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
+                sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+    }
+
+    @Override
+    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+        Stream inputStream = planCreator.pop().toStream();
+
+        String stageName = StormRelUtils.getStageName(this);
+
+        Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
+
+        List<String> inputFields = this.input.getRowType().getFieldNames();
+        List<String> outputFields = getRowType().getFieldNames();
+
+        // FIXME: this should be really different...
+        String tableName = Joiner.on('.').join(getTable().getQualifiedName());
+        ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
+
+        // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
+        IAggregatableStream finalStream = inputStream
+                .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
+                        new Fields(outputFields))
+                .newValuesStream().name(stageName);
+
+        planCreator.addStream(finalStream);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
new file mode 100644
index 0000000..c563d73
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.Map;
+
+public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
+    private final int parallelismHint;
+
+    public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
+        super(cluster, traitSet, table);
+        this.parallelismHint = parallelismHint;
+    }
+
+    @Override
+    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+        String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+        // FIXME: this should be really different...
+        Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
+        if (!sources.containsKey(sourceName)) {
+            throw new RuntimeException("Cannot find table " + sourceName);
+        }
+
+        String stageName = StormRelUtils.getStageName(this);
+        IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
+                .parallelismHint(parallelismHint);
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
new file mode 100644
index 0000000..ac35414
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentAggregateRule extends ConverterRule {
+    public static final RelOptRule INSTANCE = new TridentAggregateRule();
+
+    private TridentAggregateRule() {
+        super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        throw new UnsupportedOperationException("Aggregate operation is not supported.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
new file mode 100644
index 0000000..25126ec
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentCalcRule extends ConverterRule {
+  public static final TridentCalcRule INSTANCE = new TridentCalcRule();
+
+  private TridentCalcRule() {
+    super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Calc calc = (Calc) rel;
+    final RelNode input = calc.getInput();
+
+    return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+            convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
+            calc.getProgram());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
new file mode 100644
index 0000000..7f9c41f
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentFilterRule extends ConverterRule {
+  public static TridentFilterRule INSTANCE = new TridentFilterRule();
+
+  private TridentFilterRule() {
+    super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Filter filter = (Filter) rel;
+    final RelNode input = filter.getInput();
+
+    return new TridentFilterRel(filter.getCluster(),
+        filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+        convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
+        filter.getCondition());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
new file mode 100644
index 0000000..90f5083
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentJoinRule extends ConverterRule {
+  public static final TridentJoinRule INSTANCE = new TridentJoinRule();
+
+  private TridentJoinRule() {
+    super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    throw new UnsupportedOperationException("Join operation is not supported.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
new file mode 100644
index 0000000..2155451
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
+
+import java.util.List;
+
+public class TridentModifyRule extends ConverterRule {
+  public static final TridentModifyRule INSTANCE = new TridentModifyRule();
+
+  private TridentModifyRule() {
+    super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableModify tableModify = (TableModify) rel;
+    final RelNode input = tableModify.getInput();
+
+    final RelOptCluster cluster = tableModify.getCluster();
+    final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
+    final RelOptTable relOptTable = tableModify.getTable();
+    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+    final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
+    final TableModify.Operation operation = tableModify.getOperation();
+    final List<String> updateColumnList = tableModify.getUpdateColumnList();
+    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+    final boolean flattened = tableModify.isFlattened();
+
+    final Table table = tableModify.getTable().unwrap(Table.class);
+
+    switch (table.getJdbcTableType()) {
+      case STREAM:
+        if (operation != TableModify.Operation.INSERT) {
+          throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
+        }
+        return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
+            updateColumnList, sourceExpressionList, flattened);
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
new file mode 100644
index 0000000..2922725
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
+
+public class TridentProjectRule extends ConverterRule {
+  public static final TridentProjectRule INSTANCE = new TridentProjectRule();
+
+  private TridentProjectRule() {
+    super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
+        "TridentProjectRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Project project = (Project) rel;
+    final RelNode input = project.getInput();
+
+    return new TridentProjectRel(project.getCluster(),
+        project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+        convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(), project.getRowType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
new file mode 100644
index 0000000..abbd680
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
+
+public class TridentScanRule extends ConverterRule {
+  public static final TridentScanRule INSTANCE = new TridentScanRule();
+  public static final int DEFAULT_PARALLELISM_HINT = 1;
+
+  private TridentScanRule() {
+    super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableScan scan = (TableScan) rel;
+    int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+    final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
+    if (parallelTable != null && parallelTable.parallelismHint() != null) {
+      parallelismHint = parallelTable.parallelismHint();
+    }
+
+    final Table table = scan.getTable().unwrap(Table.class);
+    switch (table.getJdbcTableType()) {
+      case STREAM:
+        return new TridentStreamScanRel(scan.getCluster(),
+            scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+            scan.getTable(), parallelismHint);
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+    }
+  }
+}