You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:47 UTC

[5/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
new file mode 100644
index 0000000..180232e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.AbstractStreamsProcessor;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.planner.StormRelDataTypeSystem;
+import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsRel;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.tuple.Values;
+
+public class QueryPlanner {
+
+    public static final int STORM_REL_CONVERSION_RULES = 1;
+
+    private final Planner planner;
+
+    private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+        RelDataTypeSystem.DEFAULT);
+
+    public QueryPlanner(SchemaPlus schema) {
+        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+        traitDefs.add(ConventionTraitDef.INSTANCE);
+        traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+        sqlOperatorTables.add(SqlStdOperatorTable.instance());
+        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+                Collections.emptyList(), typeFactory, new CalciteConnectionConfigImpl(new Properties())));
+
+        FrameworkConfig config = Frameworks.newConfigBuilder()
+                                           .defaultSchema(schema)
+                                           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+                                           .traitDefs(traitDefs)
+                                           .context(Contexts.EMPTY_CONTEXT)
+                                           .ruleSets(StreamsStormRuleSets.getRuleSets())
+                                           .costFactory(null)
+                                           .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
+                                           .build();
+        this.planner = Frameworks.getPlanner(config);
+    }
+
+    public AbstractStreamsProcessor compile(Map<String, ISqlStreamsDataSource> sources, String query) throws Exception {
+        StreamsRel relNode = getPlan(query);
+
+        StreamsPlanCreator streamsPlanCreator = new StreamsPlanCreator(sources, new RexBuilder(typeFactory));
+        relNode.streamsPlan(streamsPlanCreator);
+
+        final StreamBuilder streamBuilder = streamsPlanCreator.getStreamBuilder();
+        final Stream<Values> lastStream = streamsPlanCreator.pop();
+        final DataContext dc = streamsPlanCreator.getDataContext();
+        final List<CompilingClassLoader> cls = streamsPlanCreator.getClassLoaders();
+
+        return new AbstractStreamsProcessor() {
+            @Override
+            public StormTopology build() {
+                return streamBuilder.build();
+            }
+
+            @Override
+            public Stream<Values> outputStream() {
+                return lastStream;
+            }
+
+            @Override
+            public DataContext getDataContext() {
+                return dc;
+            }
+
+            @Override
+            public List<CompilingClassLoader> getClassLoaders() {
+                return cls;
+            }
+        };
+    }
+
+    public StreamsRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
+        return (StreamsRel) validateAndConvert(planner.parse(query));
+    }
+
+    private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
+        SqlNode validated = validateNode(sqlNode);
+        RelNode relNode = convertToRelNode(validated);
+        return convertToStormRel(relNode);
+    }
+
+    private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
+        RelTraitSet traitSet = relNode.getTraitSet();
+        traitSet = traitSet.simplify();
+
+        // PlannerImpl.transform() optimizes RelNode with ruleset
+        return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(StreamsLogicalConvention.INSTANCE), relNode);
+    }
+
+    private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+        return planner.rel(sqlNode).rel;
+    }
+
+    private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+        SqlNode validatedSqlNode = planner.validate(sqlNode);
+        validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
+        return validatedSqlNode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
new file mode 100644
index 0000000..977c1ee
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.tuple.Values;
+
+public class StreamsPlanCreator {
+    private final Map<String, ISqlStreamsDataSource> sources;
+    private final JavaTypeFactory typeFactory;
+    private final RexNodeToJavaCodeCompiler rexCompiler;
+    private final StreamBuilder streamBuilder;
+    private final DataContext dataContext;
+
+    private final Deque<Stream<Values>> streamStack = new ArrayDeque<>();
+    private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
+
+    public StreamsPlanCreator(Map<String, ISqlStreamsDataSource> sources, RexBuilder rexBuilder) {
+        this.sources = sources;
+        this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
+        this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
+
+        this.streamBuilder = new StreamBuilder();
+        this.dataContext = new StormDataContext();
+    }
+
+    public void addStream(Stream<Values> stream) throws Exception {
+        push(stream);
+    }
+
+    public Stream<Values> pop() {
+        return streamStack.pop();
+    }
+
+    public Map<String, ISqlStreamsDataSource> getSources() {
+        return sources;
+    }
+
+    public DataContext getDataContext() {
+        return dataContext;
+    }
+
+    public JavaTypeFactory getTypeFactory() {
+        return typeFactory;
+    }
+
+    public StreamBuilder getStreamBuilder() {
+        return streamBuilder;
+    }
+
+    public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
+        throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(nodes, inputRowType, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+            getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
+    }
+
+    public ExecutableExpression createScalarInstance(RexProgram program, String className)
+        throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+        String expr = rexCompiler.compile(program, className);
+        CompilingClassLoader classLoader = new CompilingClassLoader(
+            getLastClassLoader(), className, expr, null);
+        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+        addClassLoader(classLoader);
+        return new DebuggableExecutableExpression(instance, expr);
+    }
+
+    private void push(Stream<Values> stream) {
+        streamStack.push(stream);
+    }
+
+    public void addClassLoader(CompilingClassLoader compilingClassLoader) {
+        this.classLoaders.add(compilingClassLoader);
+    }
+
+    public ClassLoader getLastClassLoader() {
+        if (this.classLoaders.size() > 0) {
+            return this.classLoaders.get(this.classLoaders.size() - 1);
+        } else {
+            return this.getClass().getClassLoader();
+        }
+    }
+
+    public List<CompilingClassLoader> getClassLoaders() {
+        return classLoaders;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
new file mode 100644
index 0000000..95ddc00
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.Iterator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionEliminatorRule;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.storm.sql.planner.streams.rules.StreamsAggregateRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsCalcRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsFilterRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsJoinRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsModifyRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsProjectRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsScanRule;
+
+public class StreamsStormRuleSets {
+    private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
+        ImmutableSet.<RelOptRule>builder().add(
+            SortRemoveRule.INSTANCE,
+
+            FilterToCalcRule.INSTANCE,
+            ProjectToCalcRule.INSTANCE,
+            FilterCalcMergeRule.INSTANCE,
+            ProjectCalcMergeRule.INSTANCE,
+            CalcMergeRule.INSTANCE,
+
+            PruneEmptyRules.FILTER_INSTANCE,
+            PruneEmptyRules.PROJECT_INSTANCE,
+            PruneEmptyRules.UNION_INSTANCE,
+
+            ProjectFilterTransposeRule.INSTANCE,
+            FilterProjectTransposeRule.INSTANCE,
+            ProjectRemoveRule.INSTANCE,
+
+            ReduceExpressionsRule.FILTER_INSTANCE,
+            ReduceExpressionsRule.PROJECT_INSTANCE,
+            ReduceExpressionsRule.CALC_INSTANCE,
+
+            // merge and push unions rules
+            UnionEliminatorRule.INSTANCE,
+
+            StreamsScanRule.INSTANCE,
+            StreamsFilterRule.INSTANCE,
+            StreamsProjectRule.INSTANCE,
+            StreamsAggregateRule.INSTANCE,
+            StreamsJoinRule.INSTANCE,
+            StreamsModifyRule.INSTANCE,
+            StreamsCalcRule.INSTANCE
+        ).build();
+
+    public static RuleSet[] getRuleSets() {
+        return new RuleSet[]{
+            new StormRuleSet(StreamRules.RULES),
+            new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
+        };
+    }
+
+    private static class StormRuleSet implements RuleSet {
+        final ImmutableSet<RelOptRule> rules;
+
+        public StormRuleSet(ImmutableSet<RelOptRule> rules) {
+            this.rules = rules;
+        }
+
+        public StormRuleSet(ImmutableList<RelOptRule> rules) {
+            this.rules = ImmutableSet.<RelOptRule>builder()
+                .addAll(rules)
+                .build();
+        }
+
+        @Override
+        public Iterator<RelOptRule> iterator() {
+            return rules.iterator();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
new file mode 100644
index 0000000..f4c2c7d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormCalcRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationCalc;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsCalcRel extends StormCalcRelBase implements StreamsRel {
+    public StreamsCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+        super(cluster, traits, child, program);
+    }
+
+    @Override
+    public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
+        return new StreamsCalcRel(getCluster(), traitSet, child, program);
+    }
+
+    @Override
+    public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+        Stream<Values> inputStream = planCreator.pop();
+
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        List<String> outputFieldNames = getRowType().getFieldNames();
+        int outputCount = outputFieldNames.size();
+
+        // filter
+        ExecutableExpression filterInstance = null;
+        RexLocalRef condition = program.getCondition();
+        if (condition != null) {
+            RexNode conditionNode = program.expandLocalRef(condition);
+            filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
+                                                              StormRelUtils.getClassName(this));
+        }
+
+        // projection
+        ExecutableExpression projectionInstance = null;
+        List<RexLocalRef> projectList = program.getProjectList();
+        if (projectList != null && !projectList.isEmpty()) {
+            List<RexNode> expandedNodes = new ArrayList<>();
+            for (RexLocalRef project : projectList) {
+                expandedNodes.add(program.expandLocalRef(project));
+            }
+
+            projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
+                                                                  StormRelUtils.getClassName(this));
+        }
+
+        if (projectionInstance == null && filterInstance == null) {
+            // it shouldn't be happen
+            throw new IllegalStateException("Either projection or condition, or both should be provided.");
+        }
+
+        EvaluationCalc evalCalc = new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext());
+        final Stream finalStream = inputStream.flatMap(evalCalc);
+
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
new file mode 100644
index 0000000..ce17bf6
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormFilterRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationFilter;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsFilterRel extends StormFilterRelBase implements StreamsRel {
+    public StreamsFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+        super(cluster, traits, child, condition);
+    }
+
+    @Override
+    public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+        return new StreamsFilterRel(getCluster(), traitSet, input, condition);
+    }
+
+    @Override
+    public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+        Stream<Values> inputStream = planCreator.pop();
+
+        List<RexNode> childExps = getChildExps();
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        String filterClassName = StormRelUtils.getClassName(this);
+        ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
+
+        EvaluationFilter evalFilter = new EvaluationFilter(filterInstance, planCreator.getDataContext());
+        final Stream<Values> finalStream = inputStream.filter(evalFilter);
+
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
new file mode 100644
index 0000000..48a7540
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+public enum StreamsLogicalConvention implements Convention {
+    INSTANCE;
+
+    @Override
+    public Class getInterface() {
+        return StreamsRel.class;
+    }
+
+    @Override
+    public String getName() {
+        return "STORM_LOGICAL";
+    }
+
+    @Override
+    public RelTraitDef getTraitDef() {
+        return ConventionTraitDef.INSTANCE;
+    }
+
+    @Override
+    public boolean satisfies(RelTrait trait) {
+        return this == trait;
+    }
+
+    @Override
+    public void register(RelOptPlanner planner) {}
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+
+    @Override
+    public boolean canConvertConvention(Convention toConvention) {
+        return false;
+    }
+
+    @Override
+    public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
new file mode 100644
index 0000000..4ad70e1
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormProjectRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsProjectRel extends StormProjectRelBase implements StreamsRel {
+    public StreamsProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects,
+                             RelDataType rowType) {
+        super(cluster, traits, input, projects, rowType);
+    }
+
+    @Override
+    public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+        return new StreamsProjectRel(getCluster(), traitSet, input, projects, rowType);
+    }
+
+    @Override
+    public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+        Stream<Values> inputStream = planCreator.pop();
+
+        String projectionClassName = StormRelUtils.getClassName(this);
+
+        List<String> outputFieldNames = getRowType().getFieldNames();
+        int outputCount = outputFieldNames.size();
+
+        List<RexNode> childExps = getChildExps();
+        RelDataType inputRowType = getInput(0).getRowType();
+
+        ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
+        EvaluationFunction evalFunc = new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext());
+        final Stream<Values> finalStream = inputStream.map(evalFunc);
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
new file mode 100644
index 0000000..70d02ec
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import org.apache.storm.sql.planner.rel.StormRelNode;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+
+public interface StreamsRel extends StormRelNode {
+    void streamsPlan(StreamsPlanCreator planCreator) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
new file mode 100644
index 0000000..f3b8994
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel {
+    private final int primaryKeyIndex;
+
+    public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+                                  RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList,
+                                  boolean flattened, int primaryKeyIndex) {
+        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+        this.primaryKeyIndex = primaryKeyIndex;
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
+                sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+    }
+
+    @Override
+    public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+        // SingleRel
+        RelNode input = getInput();
+        StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+        Stream<Values> inputStream = planCreator.pop();
+
+        Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
+
+        // Calcite ensures that the value is structurized to the table definition
+        // hence we can use PK index directly
+        // To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER
+        // and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed,
+        // Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT.
+
+        String tableName = Joiner.on('.').join(getTable().getQualifiedName());
+        IRichBolt consumer = planCreator.getSources().get(tableName).getConsumer();
+
+        // To make logic simple, it assumes that all the tables have one PK (which it should be extended to support composed key),
+        // and provides PairStream(KeyedStream) to consumer bolt.
+        inputStream.mapToPair(new StreamInsertMapToPairFunction(primaryKeyIndex)).to(consumer);
+
+        planCreator.addStream(inputStream);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
new file mode 100644
index 0000000..563ea23
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.streams.functions.StreamsScanTupleValueMapper;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamScanRel extends StormStreamScanRelBase implements StreamsRel {
+    private final int parallelismHint;
+
+    public StreamsStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
+        super(cluster, traitSet, table);
+        this.parallelismHint = parallelismHint;
+    }
+
+    @Override
+    public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+        String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+        Map<String, ISqlStreamsDataSource> sources = planCreator.getSources();
+        if (!sources.containsKey(sourceName)) {
+            throw new RuntimeException("Cannot find table " + sourceName);
+        }
+
+        List<String> fieldNames = getRowType().getFieldNames();
+        final Stream<Values> finalStream = planCreator.getStreamBuilder()
+                .newStream(sources.get(sourceName).getProducer(), new StreamsScanTupleValueMapper(fieldNames), parallelismHint);
+        planCreator.addStream(finalStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
new file mode 100644
index 0000000..3d86f26
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsAggregateRule extends ConverterRule {
+    public static final RelOptRule INSTANCE = new StreamsAggregateRule();
+
+    private StreamsAggregateRule() {
+        super(LogicalAggregate.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsAggregateRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        throw new UnsupportedOperationException("Aggregate operation is not supported.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
new file mode 100644
index 0000000..328d4f5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.storm.sql.planner.streams.rel.StreamsCalcRel;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsCalcRule extends ConverterRule {
+    public static final StreamsCalcRule INSTANCE = new StreamsCalcRule();
+
+    private StreamsCalcRule() {
+        super(LogicalCalc.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsCalcRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        final Calc calc = (Calc) rel;
+        final RelNode input = calc.getInput();
+
+        return new StreamsCalcRel(calc.getCluster(), calc.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+                                  convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)),
+                                  calc.getProgram());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
new file mode 100644
index 0000000..1e5ab19
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.storm.sql.planner.streams.rel.StreamsFilterRel;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsFilterRule extends ConverterRule {
+    public static StreamsFilterRule INSTANCE = new StreamsFilterRule();
+
+    private StreamsFilterRule() {
+        super(LogicalFilter.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsFilterRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        final Filter filter = (Filter) rel;
+        final RelNode input = filter.getInput();
+
+        return new StreamsFilterRel(filter.getCluster(),
+                                    filter.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+                                    convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)),
+                                    filter.getCondition());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
new file mode 100644
index 0000000..e3d5a7d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsJoinRule extends ConverterRule {
+    public static final StreamsJoinRule INSTANCE = new StreamsJoinRule();
+
+    private StreamsJoinRule() {
+        super(LogicalJoin.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsJoinRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        throw new UnsupportedOperationException("Join operation is not supported.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
new file mode 100644
index 0000000..f477607
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.StormStreamableTable;
+import org.apache.storm.sql.calcite.StormTable;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsStreamInsertRel;
+
+public class StreamsModifyRule extends ConverterRule {
+    public static final StreamsModifyRule INSTANCE = new StreamsModifyRule();
+
+    private StreamsModifyRule() {
+        super(LogicalTableModify.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsModifyRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        final TableModify tableModify = (TableModify) rel;
+        final RelNode input = tableModify.getInput();
+
+        final RelOptCluster cluster = tableModify.getCluster();
+        final RelTraitSet traitSet = tableModify.getTraitSet().replace(StreamsLogicalConvention.INSTANCE);
+        final RelOptTable relOptTable = tableModify.getTable();
+        final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+        final RelNode convertedInput = convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE));
+        final TableModify.Operation operation = tableModify.getOperation();
+        final List<String> updateColumnList = tableModify.getUpdateColumnList();
+        final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+        final boolean flattened = tableModify.isFlattened();
+
+        int primaryKey;
+
+        StormTable stormTable = tableModify.getTable().unwrap(StormTable.class);
+        if (stormTable != null) {
+            primaryKey = stormTable.primaryKey();
+        } else {
+            StormStreamableTable streamableTable = tableModify.getTable().unwrap(StormStreamableTable.class);
+            if (streamableTable != null) {
+                primaryKey = streamableTable.primaryKey();
+            } else {
+                throw new IllegalStateException("Table must be able to unwrap with StormTable or StormStreamableTable.");
+            }
+        }
+
+        final Table table = tableModify.getTable().unwrap(Table.class);
+
+        switch (table.getJdbcTableType()) {
+            case STREAM:
+                if (operation != TableModify.Operation.INSERT) {
+                    throw new UnsupportedOperationException(String.format("Stream doesn't support %s modify operation", operation));
+                }
+                return new StreamsStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
+                        updateColumnList, sourceExpressionList, flattened, primaryKey);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
new file mode 100644
index 0000000..7508046
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsProjectRel;
+
+public class StreamsProjectRule extends ConverterRule {
+    public static final StreamsProjectRule INSTANCE = new StreamsProjectRule();
+
+    private StreamsProjectRule() {
+        super(LogicalProject.class, Convention.NONE, StreamsLogicalConvention.INSTANCE,
+              "StreamsProjectRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        final Project project = (Project) rel;
+        final RelNode input = project.getInput();
+
+        return new StreamsProjectRel(project.getCluster(),
+                                     project.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+                                     convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)), project.getProjects(),
+                                     project.getRowType());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
new file mode 100644
index 0000000..536fcfe
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.calcite.ParallelTable;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsStreamScanRel;
+
+public class StreamsScanRule extends ConverterRule {
+    public static final StreamsScanRule INSTANCE = new StreamsScanRule();
+    public static final int DEFAULT_PARALLELISM_HINT = 1;
+
+    private StreamsScanRule() {
+        super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, StreamsLogicalConvention.INSTANCE, "StreamsScanRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        final TableScan scan = (TableScan) rel;
+        int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+        final ParallelTable parallelTable = scan.getTable().unwrap(ParallelTable.class);
+        if (parallelTable != null && parallelTable.parallelismHint() != null) {
+            parallelismHint = parallelTable.parallelismHint();
+        }
+
+        final Table table = scan.getTable().unwrap(Table.class);
+        switch (table.getJdbcTableType()) {
+            case STREAM:
+                return new StreamsStreamScanRel(scan.getCluster(),
+                                                scan.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+                                                scan.getTable(), parallelismHint);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
deleted file mode 100644
index 2238233..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.AbstractTridentProcessor;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.planner.StormRelDataTypeSystem;
-import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class QueryPlanner {
-
-    public static final int STORM_REL_CONVERSION_RULES = 1;
-
-    private final Planner planner;
-
-    private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
-        RelDataTypeSystem.DEFAULT);
-
-    public QueryPlanner(SchemaPlus schema) {
-        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
-        traitDefs.add(ConventionTraitDef.INSTANCE);
-        traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-        List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-        sqlOperatorTables.add(SqlStdOperatorTable.instance());
-        sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
-                                                       false,
-                                                       Collections.<String>emptyList(), typeFactory));
-
-        FrameworkConfig config = Frameworks.newConfigBuilder()
-                                           .defaultSchema(schema)
-                                           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-                                           .traitDefs(traitDefs)
-                                           .context(Contexts.EMPTY_CONTEXT)
-                                           .ruleSets(TridentStormRuleSets.getRuleSets())
-                                           .costFactory(null)
-                                           .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
-                                           .build();
-        this.planner = Frameworks.getPlanner(config);
-    }
-
-    public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
-        TridentRel relNode = getPlan(query);
-
-        TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
-        relNode.tridentPlan(tridentPlanCreator);
-
-        final TridentTopology topology = tridentPlanCreator.getTopology();
-        final IAggregatableStream lastStream = tridentPlanCreator.pop();
-        final DataContext dc = tridentPlanCreator.getDataContext();
-        final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
-
-        return new AbstractTridentProcessor() {
-            @Override
-            public TridentTopology build() {
-                return topology;
-            }
-
-            @Override
-            public Stream outputStream() {
-                return lastStream.toStream();
-            }
-
-            @Override
-            public DataContext getDataContext() {
-                return dc;
-            }
-
-            @Override
-            public List<CompilingClassLoader> getClassLoaders() {
-                return cls;
-            }
-        };
-    }
-
-    public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
-        return (TridentRel) validateAndConvert(planner.parse(query));
-    }
-
-    private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
-        SqlNode validated = validateNode(sqlNode);
-        RelNode relNode = convertToRelNode(validated);
-        return convertToStormRel(relNode);
-    }
-
-    private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
-        RelTraitSet traitSet = relNode.getTraitSet();
-        traitSet = traitSet.simplify();
-
-        // PlannerImpl.transform() optimizes RelNode with ruleset
-        return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
-    }
-
-    private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
-        return planner.rel(sqlNode).rel;
-    }
-
-    private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
-        SqlNode validatedSqlNode = planner.validate(sqlNode);
-        validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
-        return validatedSqlNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
deleted file mode 100644
index 45277ca..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.StormDataContext;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentPlanCreator {
-    private final Map<String, ISqlTridentDataSource> sources;
-    private final JavaTypeFactory typeFactory;
-    private final RexNodeToJavaCodeCompiler rexCompiler;
-    private final DataContext dataContext;
-    private final TridentTopology topology;
-
-    private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
-    private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
-
-    public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
-        this.sources = sources;
-        this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
-        this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
-
-        this.topology = new TridentTopology();
-        this.dataContext = new StormDataContext();
-    }
-
-    public void addStream(IAggregatableStream stream) throws Exception {
-        push(stream);
-    }
-
-    public IAggregatableStream pop() {
-        return streamStack.pop();
-    }
-
-    public Map<String, ISqlTridentDataSource> getSources() {
-        return sources;
-    }
-
-    public DataContext getDataContext() {
-        return dataContext;
-    }
-
-    public JavaTypeFactory getTypeFactory() {
-        return typeFactory;
-    }
-
-    public TridentTopology getTopology() {
-        return topology;
-    }
-
-    public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
-        throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
-        String expr = rexCompiler.compile(nodes, inputRowType, className);
-        CompilingClassLoader classLoader = new CompilingClassLoader(
-            getLastClassLoader(), className, expr, null);
-        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
-        addClassLoader(classLoader);
-        return new DebuggableExecutableExpression(instance, expr);
-    }
-
-    public ExecutableExpression createScalarInstance(RexProgram program, String className)
-        throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
-        String expr = rexCompiler.compile(program, className);
-        CompilingClassLoader classLoader = new CompilingClassLoader(
-            getLastClassLoader(), className, expr, null);
-        ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
-        addClassLoader(classLoader);
-        return new DebuggableExecutableExpression(instance, expr);
-    }
-
-    private void push(IAggregatableStream stream) {
-        streamStack.push(stream);
-    }
-
-    public void addClassLoader(CompilingClassLoader compilingClassLoader) {
-        this.classLoaders.add(compilingClassLoader);
-    }
-
-    public ClassLoader getLastClassLoader() {
-        if (this.classLoaders.size() > 0) {
-            return this.classLoaders.get(this.classLoaders.size() - 1);
-        } else {
-            return this.getClass().getClassLoader();
-        }
-    }
-
-    public List<CompilingClassLoader> getClassLoaders() {
-        return classLoaders;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
deleted file mode 100644
index 0c988b6..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.FilterCalcMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterToCalcRule;
-import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.UnionEliminatorRule;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
-import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
-import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
-import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
-import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
-import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
-import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
-
-public class TridentStormRuleSets {
-    private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
-        ImmutableSet.<RelOptRule>builder().add(
-            SortRemoveRule.INSTANCE,
-
-            FilterToCalcRule.INSTANCE,
-            ProjectToCalcRule.INSTANCE,
-            FilterCalcMergeRule.INSTANCE,
-            ProjectCalcMergeRule.INSTANCE,
-            CalcMergeRule.INSTANCE,
-
-            PruneEmptyRules.FILTER_INSTANCE,
-            PruneEmptyRules.PROJECT_INSTANCE,
-            PruneEmptyRules.UNION_INSTANCE,
-
-            ProjectFilterTransposeRule.INSTANCE,
-            FilterProjectTransposeRule.INSTANCE,
-            ProjectRemoveRule.INSTANCE,
-
-            ReduceExpressionsRule.FILTER_INSTANCE,
-            ReduceExpressionsRule.PROJECT_INSTANCE,
-            ReduceExpressionsRule.CALC_INSTANCE,
-
-            // merge and push unions rules
-            UnionEliminatorRule.INSTANCE,
-
-            TridentScanRule.INSTANCE,
-            TridentFilterRule.INSTANCE,
-            TridentProjectRule.INSTANCE,
-            TridentAggregateRule.INSTANCE,
-            TridentJoinRule.INSTANCE,
-            TridentModifyRule.INSTANCE,
-            TridentCalcRule.INSTANCE
-        ).build();
-
-    public static RuleSet[] getRuleSets() {
-        return new RuleSet[]{
-            new StormRuleSet(StreamRules.RULES),
-            new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
-        };
-    }
-
-    private static class StormRuleSet implements RuleSet {
-        final ImmutableSet<RelOptRule> rules;
-
-        public StormRuleSet(ImmutableSet<RelOptRule> rules) {
-            this.rules = rules;
-        }
-
-        public StormRuleSet(ImmutableList<RelOptRule> rules) {
-            this.rules = ImmutableSet.<RelOptRule>builder()
-                .addAll(rules)
-                .build();
-        }
-
-        @Override
-        public Iterator<RelOptRule> iterator() {
-            return rules.iterator();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
deleted file mode 100644
index 04acd34..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormCalcRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
-    public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
-        super(cluster, traits, child, program);
-    }
-
-    @Override
-    public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
-        return new TridentCalcRel(getCluster(), traitSet, child, program);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        List<String> outputFieldNames = getRowType().getFieldNames();
-        int outputCount = outputFieldNames.size();
-
-        // filter
-        ExecutableExpression filterInstance = null;
-        RexLocalRef condition = program.getCondition();
-        if (condition != null) {
-            RexNode conditionNode = program.expandLocalRef(condition);
-            filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
-                                                              StormRelUtils.getClassName(this));
-        }
-
-        // projection
-        ExecutableExpression projectionInstance = null;
-        List<RexLocalRef> projectList = program.getProjectList();
-        if (projectList != null && !projectList.isEmpty()) {
-            List<RexNode> expandedNodes = new ArrayList<>();
-            for (RexLocalRef project : projectList) {
-                expandedNodes.add(program.expandLocalRef(project));
-            }
-
-            projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
-                                                                  StormRelUtils.getClassName(this));
-        }
-
-        if (projectionInstance == null && filterInstance == null) {
-            // it shouldn't be happen
-            throw new IllegalStateException("Either projection or condition, or both should be provided.");
-        }
-
-        final Stream finalStream = inputStream
-            .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()),
-                     new Fields(outputFieldNames))
-            .name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
deleted file mode 100644
index ba3c5e2..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormFilterRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
-    public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
-        super(cluster, traits, child, condition);
-    }
-
-    @Override
-    public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
-        return new TridentFilterRel(getCluster(), traitSet, input, condition);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        List<RexNode> childExps = getChildExps();
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        String filterClassName = StormRelUtils.getClassName(this);
-        ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
-
-        IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
-                                                     .name(stageName);
-        planCreator.addStream(finalStream);
-    }
-}