You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:46 UTC

[4/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API to Streams API

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
deleted file mode 100644
index 28256e8..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-public enum TridentLogicalConvention implements Convention {
-    INSTANCE;
-
-    @Override
-    public Class getInterface() {
-        return TridentRel.class;
-    }
-
-    @Override
-    public String getName() {
-        return "STORM_LOGICAL";
-    }
-
-    @Override
-    public RelTraitDef getTraitDef() {
-        return ConventionTraitDef.INSTANCE;
-    }
-
-    @Override
-    public boolean satisfies(RelTrait trait) {
-        return this == trait;
-    }
-
-    @Override
-    public void register(RelOptPlanner planner) {}
-
-    @Override
-    public String toString() {
-        return getName();
-    }
-
-    @Override
-    public boolean canConvertConvention(Convention toConvention) {
-        return false;
-    }
-
-    @Override
-    public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
deleted file mode 100644
index 1eef647..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormProjectRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
-    public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects,
-                             RelDataType rowType) {
-        super(cluster, traits, input, projects, rowType);
-    }
-
-    @Override
-    public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
-        return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-        String projectionClassName = StormRelUtils.getClassName(this);
-
-        List<String> outputFieldNames = getRowType().getFieldNames();
-        int outputCount = outputFieldNames.size();
-
-        List<RexNode> childExps = getChildExps();
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
-        Stream finalStream = inputStream
-            .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
-            .name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
deleted file mode 100644
index 6e90997..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.storm.sql.planner.rel.StormRelNode;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-
-public interface TridentRel extends StormRelNode {
-    void tridentPlan(TridentPlanCreator planCreator) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
deleted file mode 100644
index 28e43bd..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
-    public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader,
-                                  RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList,
-                                  boolean flattened) {
-        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
-    }
-
-    @Override
-    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
-                                          sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
-
-        List<String> inputFields = this.input.getRowType().getFieldNames();
-        List<String> outputFields = getRowType().getFieldNames();
-
-        // FIXME: this should be really different...
-        String tableName = Joiner.on('.').join(getTable().getQualifiedName());
-        ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
-
-        // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
-        IAggregatableStream finalStream = inputStream
-            .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
-                              new Fields(outputFields))
-            .newValuesStream().name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
deleted file mode 100644
index 706cdf5..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import java.util.Map;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
-    private final int parallelismHint;
-
-    public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
-        super(cluster, traitSet, table);
-        this.parallelismHint = parallelismHint;
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-        // FIXME: this should be really different...
-        Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
-        if (!sources.containsKey(sourceName)) {
-            throw new RuntimeException("Cannot find table " + sourceName);
-        }
-
-        String stageName = StormRelUtils.getStageName(this);
-        IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
-                                                     .parallelismHint(parallelismHint);
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
deleted file mode 100644
index 316a727..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentAggregateRule extends ConverterRule {
-    public static final RelOptRule INSTANCE = new TridentAggregateRule();
-
-    private TridentAggregateRule() {
-        super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        throw new UnsupportedOperationException("Aggregate operation is not supported.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
deleted file mode 100644
index b3709b7..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.logical.LogicalCalc;
-import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentCalcRule extends ConverterRule {
-    public static final TridentCalcRule INSTANCE = new TridentCalcRule();
-
-    private TridentCalcRule() {
-        super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        final Calc calc = (Calc) rel;
-        final RelNode input = calc.getInput();
-
-        return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-                                  convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
-                                  calc.getProgram());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
deleted file mode 100644
index 7678301..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentFilterRule extends ConverterRule {
-    public static TridentFilterRule INSTANCE = new TridentFilterRule();
-
-    private TridentFilterRule() {
-        super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        final Filter filter = (Filter) rel;
-        final RelNode input = filter.getInput();
-
-        return new TridentFilterRel(filter.getCluster(),
-                                    filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-                                    convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
-                                    filter.getCondition());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
deleted file mode 100644
index 305f315..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentJoinRule extends ConverterRule {
-    public static final TridentJoinRule INSTANCE = new TridentJoinRule();
-
-    private TridentJoinRule() {
-        super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        throw new UnsupportedOperationException("Join operation is not supported.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
deleted file mode 100644
index 6848a5e..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import java.util.List;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
-
-public class TridentModifyRule extends ConverterRule {
-    public static final TridentModifyRule INSTANCE = new TridentModifyRule();
-
-    private TridentModifyRule() {
-        super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        final TableModify tableModify = (TableModify) rel;
-        final RelNode input = tableModify.getInput();
-
-        final RelOptCluster cluster = tableModify.getCluster();
-        final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
-        final RelOptTable relOptTable = tableModify.getTable();
-        final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-        final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
-        final TableModify.Operation operation = tableModify.getOperation();
-        final List<String> updateColumnList = tableModify.getUpdateColumnList();
-        final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
-        final boolean flattened = tableModify.isFlattened();
-
-        final Table table = tableModify.getTable().unwrap(Table.class);
-
-        switch (table.getJdbcTableType()) {
-            case STREAM:
-                if (operation != TableModify.Operation.INSERT) {
-                    throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
-                }
-                return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
-                                                  updateColumnList, sourceExpressionList, flattened);
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
deleted file mode 100644
index 4e415e0..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
-
-public class TridentProjectRule extends ConverterRule {
-    public static final TridentProjectRule INSTANCE = new TridentProjectRule();
-
-    private TridentProjectRule() {
-        super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
-              "TridentProjectRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        final Project project = (Project) rel;
-        final RelNode input = project.getInput();
-
-        return new TridentProjectRel(project.getCluster(),
-                                     project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-                                     convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(),
-                                     project.getRowType());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
deleted file mode 100644
index a6ff965..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
-
-public class TridentScanRule extends ConverterRule {
-    public static final TridentScanRule INSTANCE = new TridentScanRule();
-    public static final int DEFAULT_PARALLELISM_HINT = 1;
-
-    private TridentScanRule() {
-        super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        final TableScan scan = (TableScan) rel;
-        int parallelismHint = DEFAULT_PARALLELISM_HINT;
-
-        final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
-        if (parallelTable != null && parallelTable.parallelismHint() != null) {
-            parallelismHint = parallelTable.parallelismHint();
-        }
-
-        final Table table = scan.getTable().unwrap(Table.class);
-        switch (table.getJdbcTableType()) {
-            case STREAM:
-                return new TridentStreamScanRel(scan.getCluster(),
-                                                scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-                                                scan.getTable(), parallelismHint);
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
index 4d90cba..8cc713a 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
@@ -19,34 +19,29 @@
 
 package org.apache.storm.sql;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.utils.Utils;
 
-import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
-
 public final class SqlTestUtil {
 
-    public static void runTridentTopology(LocalCluster cluster, final int expectedValueSize, AbstractTridentProcessor proc,
-                                          TridentTopology topo) throws Exception {
+    public static void runStormTopology(LocalCluster cluster, final List<?> watchedList, final int expectedValueSize,
+                                        AbstractStreamsProcessor proc, StormTopology topo) throws Exception {
         final Config conf = new Config();
         conf.setMaxSpoutPending(20);
+        conf.setDebug(true);
 
         if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
             CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
             Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
         }
 
-        try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
-            waitForCompletion(1000 * 1000, new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return getCollectedValues().size() < expectedValueSize;
-                }
-            });
+        try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo)) {
+            waitForCompletion(1000 * 1000, () -> watchedList.size() < expectedValueSize);
         } finally {
             while (cluster.getClusterInfo().get_topologies_size() > 0) {
                 Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
index c7b0743..450e4f4 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
@@ -23,11 +23,11 @@ import java.util.function.Predicate;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.utils.Utils;
 
 public class StormSqlLocalClusterImpl {
@@ -57,15 +57,15 @@ public class StormSqlLocalClusterImpl {
             } else if (node instanceof SqlCreateFunction) {
                 sqlContext.interpretCreateFunction((SqlCreateFunction) node);
             } else {
-                AbstractTridentProcessor processor = sqlContext.compileSql(sql);
-                TridentTopology topo = processor.build();
+                AbstractStreamsProcessor processor = sqlContext.compileSql(sql);
+                StormTopology topo = processor.build();
 
                 if (processor.getClassLoaders() != null && processor.getClassLoaders().size() > 0) {
                     CompilingClassLoader lastClassloader = processor.getClassLoaders().get(processor.getClassLoaders().size() - 1);
                     Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
                 }
 
-                try (LocalCluster.LocalTopology stormTopo = localCluster.submitTopology("storm-sql", conf, topo.build())) {
+                try (LocalCluster.LocalTopology stormTopo = localCluster.submitTopology("storm-sql", conf, topo)) {
                     waitForCompletion(waitTimeoutMs, waitCondition);
                 } finally {
                     while (localCluster.getClusterInfo().get_topologies_size() > 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 468a5dc..5a30764 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -12,34 +12,42 @@
 
 package org.apache.storm.sql;
 
-import com.google.common.collect.ImmutableMap;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.DataSourcesRegistry;
 import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
 import org.apache.storm.tuple.Values;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
-import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+import org.junit.rules.ExternalResource;
 
 public class TestStormSql {
 
     public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
     public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
     public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
+
+    @Rule
+    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
+
+    @Rule
+    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+
     private static LocalCluster cluster;
 
     @BeforeClass
@@ -67,43 +75,39 @@ public class TestStormSql {
         }
     }
 
-    @Before
-    public void setUp() {
-        getCollectedValues().clear();
-    }
-
     @Test
     public void testExternalDataSource() throws Exception {
         List<String> stmt = new ArrayList<>();
-        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
         stmt.add("INSERT INTO BAR SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, WAIT_TIMEOUT_MS);
 
         Assert.assertEquals(2, values.size());
-        Assert.assertEquals(4, values.get(0).get(0));
-        Assert.assertEquals(5, values.get(1).get(0));
+        Assert.assertEquals(4, values.get(0).getFirst());
+        Assert.assertEquals(5, values.get(1).getFirst());
     }
 
     @Test
     public void testExternalDataSourceNested() throws Exception {
         List<String> stmt = new ArrayList<>();
-        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
         stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
                  "FROM FOO " +
                  "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();;
         impl.runLocal(cluster, stmt, (__) -> values.size() >= 1, WAIT_TIMEOUT_MS);
 
         Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
         Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-        Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+        Assert.assertEquals(2, values.get(0).getFirst());
+        Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0).getSecond());
     }
 
     @Test
@@ -117,7 +121,7 @@ public class TestStormSql {
                  "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
 
         Assert.assertEquals(0, values.size());
@@ -134,7 +138,7 @@ public class TestStormSql {
                  "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
 
         Assert.assertEquals(0, values.size());
@@ -150,7 +154,7 @@ public class TestStormSql {
                  "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
 
         Assert.assertEquals(0, values.size());
@@ -166,7 +170,7 @@ public class TestStormSql {
                  "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
 
         Assert.assertEquals(0, values.size());
@@ -204,25 +208,25 @@ public class TestStormSql {
     @Test
     public void testExternalUdf() throws Exception {
         List<String> stmt = new ArrayList<>();
-        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
         stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        List<List<Object>> values = getCollectedValues();
+        List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
         impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, WAIT_TIMEOUT_MS);
 
         Assert.assertEquals(2, values.size());
-        Assert.assertEquals(4, values.get(0).get(0));
-        Assert.assertEquals(5, values.get(1).get(0));
+        Assert.assertEquals(4, values.get(0).getFirst());
+        Assert.assertEquals(5, values.get(1).getFirst());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testExternalUdfUsingJar() throws Exception {
         List<String> stmt = new ArrayList<>();
-        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+        stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
         stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
@@ -239,9 +243,9 @@ public class TestStormSql {
         }
 
         @Override
-        public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+        public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                       Properties properties, List<FieldInfo> fields) {
-            return new TestUtils.MockSqlExprDataSource();
+            return new TestUtils.MockSqlStreamsInsertDataSource();
         }
     }
 
@@ -252,9 +256,9 @@ public class TestStormSql {
         }
 
         @Override
-        public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+        public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                       Properties properties, List<FieldInfo> fields) {
-            return new TestUtils.MockSqlTridentNestedDataSource();
+            return new TestUtils.MockSqlStreamsInsertNestedDataSource();
         }
     }
 
@@ -265,9 +269,9 @@ public class TestStormSql {
         }
 
         @Override
-        public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+        public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                       Properties properties, List<FieldInfo> fields) {
-            return new TestUtils.MockSqlTridentGroupedDataSource();
+            return new TestUtils.MockSqlStreamsInsertGroupedDataSource();
         }
     }
 
@@ -278,9 +282,9 @@ public class TestStormSql {
         }
 
         @Override
-        public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+        public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                       Properties properties, List<FieldInfo> fields) {
-            return new TestUtils.MockSqlTridentJoinDataSourceEmp();
+            return new TestUtils.MockSqlStreamsInsertJoinDataSourceEmp();
         }
     }
 
@@ -291,9 +295,9 @@ public class TestStormSql {
         }
 
         @Override
-        public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+        public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
                                                       Properties properties, List<FieldInfo> fields) {
-            return new TestUtils.MockSqlTridentJoinDataSourceDept();
+            return new TestUtils.MockSqlStreamsInsertJoinDataSourceDept();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
new file mode 100644
index 0000000..b7fb5b3
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.compiler.backends.streams;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.streams.QueryPlanner;
+import org.apache.storm.sql.planner.streams.rel.StreamsRel;
+
+public class TestCompilerUtils {
+
+    public static CalciteState sqlOverDummyTable(String sql)
+        throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+            (RelDataTypeSystem.DEFAULT);
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+            .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+            .field("NAME", typeFactory.createType(String.class))
+            .field("ADDR", typeFactory.createType(String.class))
+            .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+        QueryPlanner queryPlanner = new QueryPlanner(schema);
+        StreamsRel tree = queryPlanner.getPlan(sql);
+        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverDummyGroupByTable(String sql)
+        throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+            (RelDataTypeSystem.DEFAULT);
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+            .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+            .field("GRPID", SqlTypeName.INTEGER)
+            .field("NAME", typeFactory.createType(String.class))
+            .field("ADDR", typeFactory.createType(String.class))
+            .field("AGE", SqlTypeName.INTEGER)
+            .field("SCORE", SqlTypeName.INTEGER)
+            .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
+        schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
+        schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
+
+        QueryPlanner queryPlanner = new QueryPlanner(schema);
+        StreamsRel tree = queryPlanner.getPlan(sql);
+        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverNestedTable(String sql)
+        throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+            (RelDataTypeSystem.DEFAULT);
+
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+            .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+            .field("MAPFIELD",
+                   typeFactory.createTypeWithNullability(
+                       typeFactory.createMapType(
+                           typeFactory.createTypeWithNullability(
+                               typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                           typeFactory.createTypeWithNullability(
+                               typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+                       , true))
+            .field("NESTEDMAPFIELD",
+                   typeFactory.createTypeWithNullability(
+                       typeFactory.createMapType(
+                           typeFactory.createTypeWithNullability(
+                               typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                           typeFactory.createTypeWithNullability(
+                               typeFactory.createMapType(
+                                   typeFactory.createTypeWithNullability(
+                                       typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+                                   typeFactory.createTypeWithNullability(
+                                       typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+                               , true))
+                       , true))
+            .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
+                typeFactory.createArrayType(
+                    typeFactory.createTypeWithNullability(
+                        typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
+                , true))
+            .build();
+        Table table = streamableTable.stream();
+        schema.add("FOO", table);
+        schema.add("BAR", table);
+        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+        QueryPlanner queryPlanner = new QueryPlanner(schema);
+        StreamsRel tree = queryPlanner.getPlan(sql);
+        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+        return new CalciteState(schema, tree);
+    }
+
+    public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
+        throws RelConversionException, ValidationException, SqlParseException {
+        SchemaPlus schema = Frameworks.createRootSchema(true);
+        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+            (RelDataTypeSystem.DEFAULT);
+
+        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+            .field("EMPID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+            .field("EMPNAME", SqlTypeName.VARCHAR)
+            .field("DEPTID", SqlTypeName.INTEGER)
+            .build();
+
+        StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
+            .field("DEPTID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+            .field("DEPTNAME", SqlTypeName.VARCHAR)
+            .build();
+
+        Table table = streamableTable.stream();
+        Table table2 = streamableTable2.stream();
+        schema.add("EMP", table);
+        schema.add("DEPT", table2);
+
+        QueryPlanner queryPlanner = new QueryPlanner(schema);
+        StreamsRel tree = queryPlanner.getPlan(sql);
+        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+        return new CalciteState(schema, tree);
+    }
+
+    public static class MyPlus {
+        public static Integer eval(Integer x, Integer y) {
+            return x + y;
+        }
+    }
+
+    public static class MyStaticSumFunction {
+        public static long init() {
+            return 0L;
+        }
+
+        public static long add(long accumulator, int v) {
+            return accumulator + v;
+        }
+    }
+
+    public static class MySumFunction {
+        public MySumFunction() {
+        }
+
+        public long init() {
+            return 0L;
+        }
+
+        public long add(long accumulator, int v) {
+            return accumulator + v;
+        }
+
+        public long result(long accumulator) {
+            return accumulator;
+        }
+    }
+
+    public static class CalciteState {
+        final SchemaPlus schema;
+        final RelNode tree;
+
+        private CalciteState(SchemaPlus schema, RelNode tree) {
+            this.schema = schema;
+            this.tree = tree;
+        }
+
+        public SchemaPlus schema() {
+            return schema;
+        }
+
+        public RelNode tree() {
+            return tree;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
new file mode 100644
index 0000000..68203d8
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
@@ -0,0 +1,235 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.streams;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.sql.SqlTestUtil;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.AbstractStreamsProcessor;
+import org.apache.storm.sql.planner.streams.QueryPlanner;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.tuple.Values;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestPlanCompiler {
+    private static LocalCluster cluster;
+
+    @Rule
+    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
+
+    @Rule
+    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+
+    @BeforeClass
+    public static void staticSetup() throws Exception {
+        cluster = new LocalCluster();
+    }
+
+    @AfterClass
+    public static void staticCleanup() {
+        if (cluster!= null) {
+            cluster.shutdown();
+            cluster = null;
+        }
+    }
+
+    @Test
+    public void testCompile() throws Exception {
+        final int EXPECTED_VALUE_SIZE = 2;
+        String sql = "SELECT ID FROM FOO WHERE ID > 2";
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+        final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        // inject output bolt
+        proc.outputStream().to(new TestUtils.MockBolt());
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+        Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
+    }
+
+    @Test
+    public void testInsert() throws Exception {
+        final int EXPECTED_VALUE_SIZE = 1;
+        String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+        final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+        data.put("BAR", new TestUtils.MockSqlStreamsOutputDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockInsertBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+        Assert.assertArrayEquals(new Pair[] { Pair.of(4, new Values(4, "abcde", "y")) }, TestUtils.MockInsertBolt.getCollectedValues().toArray());
+    }
+
+    @Test
+    public void testUdf() throws Exception {
+        int EXPECTED_VALUE_SIZE = 1;
+        String sql = "SELECT MYPLUS(ID, 3)" +
+                "FROM FOO " +
+                "WHERE ID = 2";
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+        Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        // inject output bolt
+        proc.outputStream().to(new TestUtils.MockBolt());
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+        Assert.assertArrayEquals(new Values[] { new Values(5) }, TestUtils.MockBolt.getCollectedValues().toArray());
+    }
+
+    @Test
+    public void testNested() throws Exception {
+        int EXPECTED_VALUE_SIZE = 1;
+        String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+                "FROM FOO " +
+                "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+
+        final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsNestedDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        // inject output bolt
+        proc.outputStream().to(new TestUtils.MockBolt());
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+        Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+        Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+        Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))},
+                TestUtils.MockBolt.getCollectedValues().toArray());
+    }
+
+    /**
+     * All the binary literal tests are done here, because Avatica converts the result to byte[]
+     * whereas Stream provides the result to ByteString which makes different semantic from Stream implementation.
+     */
+    @Test
+    public void testBinaryStringFunctions() throws Exception {
+        int EXPECTED_VALUE_SIZE = 1;
+        String sql = "SELECT x'45F0AB' || x'45F0AB', " +
+                "POSITION(x'F0' IN x'453423F0ABBC'), " +
+                "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
+                "SUBSTRING(x'453423F0ABBC' FROM 3), " +
+                "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
+                "FROM FOO " +
+                "WHERE ID > 0 AND ID < 2";
+
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+        final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        // inject output bolt
+        proc.outputStream().to(new TestUtils.MockBolt());
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+        Values v = TestUtils.MockBolt.getCollectedValues().get(0);
+
+        assertEquals("45f0ab45f0ab", v.get(0).toString());
+        assertEquals(4, v.get(1));
+        assertEquals("45344534abbc45", v.get(2).toString());
+        assertEquals("23f0abbc", v.get(3).toString());
+        assertEquals("23f0abbc", v.get(4).toString());
+    }
+
+    /**
+     * All the date/time/timestamp related tests are done here, because Avatica converts the result of date functions to java.sql classes
+     * whereas Stream provides long type which makes different semantic from Stream implementation.
+     */
+    @Test
+    public void testDateKeywordsAndFunctions() throws Exception {
+        int EXPECTED_VALUE_SIZE = 1;
+        String sql = "SELECT " +
+                "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " +
+                "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
+                "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
+                "FLOOR(DATE '2016-01-23' TO MONTH)," +
+                "CEIL(TIME '12:34:56' TO MINUTE)," +
+                "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," +
+                "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," +
+                "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," +
+                "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
+                "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "   +
+                "FROM FOO " +
+                "WHERE ID > 0 AND ID < 2";
+        TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+
+        final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+        data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+        QueryPlanner planner = new QueryPlanner(state.schema());
+        AbstractStreamsProcessor proc = planner.compile(data, sql);
+        // inject output bolt
+        proc.outputStream().to(new TestUtils.MockBolt());
+        final DataContext dataContext = proc.getDataContext();
+        final StormTopology topo = proc.build();
+
+        SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+        long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
+        long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
+        long localTimestamp = (long) dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName);
+
+        System.out.println(TestUtils.MockBolt.getCollectedValues());
+
+        java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp);
+        int dateInt = (int) timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay();
+        int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+        int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+
+        Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt,
+                        134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+                TestUtils.MockBolt.getCollectedValues().toArray());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
deleted file mode 100644
index 0f20d4d..0000000
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.compiler.backends.trident;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-
-public class TestCompilerUtils {
-
-    public static CalciteState sqlOverDummyTable(String sql)
-        throws RelConversionException, ValidationException, SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-            (RelDataTypeSystem.DEFAULT);
-        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("ID", SqlTypeName.INTEGER)
-            .field("NAME", typeFactory.createType(String.class))
-            .field("ADDR", typeFactory.createType(String.class))
-            .build();
-        Table table = streamableTable.stream();
-        schema.add("FOO", table);
-        schema.add("BAR", table);
-        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
-        QueryPlanner queryPlanner = new QueryPlanner(schema);
-        TridentRel tree = queryPlanner.getPlan(sql);
-        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static CalciteState sqlOverDummyGroupByTable(String sql)
-        throws RelConversionException, ValidationException, SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-            (RelDataTypeSystem.DEFAULT);
-        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("ID", SqlTypeName.INTEGER)
-            .field("GRPID", SqlTypeName.INTEGER)
-            .field("NAME", typeFactory.createType(String.class))
-            .field("ADDR", typeFactory.createType(String.class))
-            .field("AGE", SqlTypeName.INTEGER)
-            .field("SCORE", SqlTypeName.INTEGER)
-            .build();
-        Table table = streamableTable.stream();
-        schema.add("FOO", table);
-        schema.add("BAR", table);
-        schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
-        schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
-
-        QueryPlanner queryPlanner = new QueryPlanner(schema);
-        TridentRel tree = queryPlanner.getPlan(sql);
-        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static CalciteState sqlOverNestedTable(String sql)
-        throws RelConversionException, ValidationException, SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-            (RelDataTypeSystem.DEFAULT);
-
-        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("ID", SqlTypeName.INTEGER)
-            .field("MAPFIELD",
-                   typeFactory.createTypeWithNullability(
-                       typeFactory.createMapType(
-                           typeFactory.createTypeWithNullability(
-                               typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                           typeFactory.createTypeWithNullability(
-                               typeFactory.createSqlType(SqlTypeName.INTEGER), true))
-                       , true))
-            .field("NESTEDMAPFIELD",
-                   typeFactory.createTypeWithNullability(
-                       typeFactory.createMapType(
-                           typeFactory.createTypeWithNullability(
-                               typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                           typeFactory.createTypeWithNullability(
-                               typeFactory.createMapType(
-                                   typeFactory.createTypeWithNullability(
-                                       typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
-                                   typeFactory.createTypeWithNullability(
-                                       typeFactory.createSqlType(SqlTypeName.INTEGER), true))
-                               , true))
-                       , true))
-            .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
-                typeFactory.createArrayType(
-                    typeFactory.createTypeWithNullability(
-                        typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
-                , true))
-            .build();
-        Table table = streamableTable.stream();
-        schema.add("FOO", table);
-        schema.add("BAR", table);
-        schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
-        QueryPlanner queryPlanner = new QueryPlanner(schema);
-        TridentRel tree = queryPlanner.getPlan(sql);
-        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
-        throws RelConversionException, ValidationException, SqlParseException {
-        SchemaPlus schema = Frameworks.createRootSchema(true);
-        JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-            (RelDataTypeSystem.DEFAULT);
-
-        StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("EMPID", SqlTypeName.INTEGER)
-            .field("EMPNAME", SqlTypeName.VARCHAR)
-            .field("DEPTID", SqlTypeName.INTEGER)
-            .build();
-        Table table = streamableTable.stream();
-
-        StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
-            .field("DEPTID", SqlTypeName.INTEGER)
-            .field("DEPTNAME", SqlTypeName.VARCHAR)
-            .build();
-        Table table2 = streamableTable2.stream();
-
-        schema.add("EMP", table);
-        schema.add("DEPT", table2);
-
-        QueryPlanner queryPlanner = new QueryPlanner(schema);
-        TridentRel tree = queryPlanner.getPlan(sql);
-        System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
-        return new CalciteState(schema, tree);
-    }
-
-    public static class MyPlus {
-        public static Integer eval(Integer x, Integer y) {
-            return x + y;
-        }
-    }
-
-    public static class MyStaticSumFunction {
-        public static long init() {
-            return 0L;
-        }
-
-        public static long add(long accumulator, int v) {
-            return accumulator + v;
-        }
-    }
-
-    public static class MySumFunction {
-        public MySumFunction() {
-        }
-
-        public long init() {
-            return 0L;
-        }
-
-        public long add(long accumulator, int v) {
-            return accumulator + v;
-        }
-
-        public long result(long accumulator) {
-            return accumulator;
-        }
-    }
-
-    public static class CalciteState {
-        final SchemaPlus schema;
-        final RelNode tree;
-
-        private CalciteState(SchemaPlus schema, RelNode tree) {
-            this.schema = schema;
-            this.tree = tree;
-        }
-
-        public SchemaPlus schema() {
-            return schema;
-        }
-
-        public RelNode tree() {
-            return tree;
-        }
-    }
-}