You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:18 UTC

[15/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
deleted file mode 100644
index e146069..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.storm.sql.planner.trident;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.FilterCalcMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterToCalcRule;
-import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.UnionEliminatorRule;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
-import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
-import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
-import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
-import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
-import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
-import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
-
-import java.util.Iterator;
-
-public class TridentStormRuleSets {
-    private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
-            ImmutableSet.<RelOptRule>builder().add(
-                    SortRemoveRule.INSTANCE,
-
-                    FilterToCalcRule.INSTANCE,
-                    ProjectToCalcRule.INSTANCE,
-                    FilterCalcMergeRule.INSTANCE,
-                    ProjectCalcMergeRule.INSTANCE,
-                    CalcMergeRule.INSTANCE,
-
-                    PruneEmptyRules.FILTER_INSTANCE,
-                    PruneEmptyRules.PROJECT_INSTANCE,
-                    PruneEmptyRules.UNION_INSTANCE,
-
-                    ProjectFilterTransposeRule.INSTANCE,
-                    FilterProjectTransposeRule.INSTANCE,
-                    ProjectRemoveRule.INSTANCE,
-
-                    ReduceExpressionsRule.FILTER_INSTANCE,
-                    ReduceExpressionsRule.PROJECT_INSTANCE,
-                    ReduceExpressionsRule.CALC_INSTANCE,
-
-                    // merge and push unions rules
-                    UnionEliminatorRule.INSTANCE,
-
-                    TridentScanRule.INSTANCE,
-                    TridentFilterRule.INSTANCE,
-                    TridentProjectRule.INSTANCE,
-                    TridentAggregateRule.INSTANCE,
-                    TridentJoinRule.INSTANCE,
-                    TridentModifyRule.INSTANCE,
-                    TridentCalcRule.INSTANCE
-            ).build();
-
-    public static RuleSet[] getRuleSets() {
-        return new RuleSet[]{
-                new StormRuleSet(StreamRules.RULES),
-                new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
-        };
-    }
-
-    private static class StormRuleSet implements RuleSet {
-        final ImmutableSet<RelOptRule> rules;
-
-        public StormRuleSet(ImmutableSet<RelOptRule> rules) {
-            this.rules = rules;
-        }
-
-        public StormRuleSet(ImmutableList<RelOptRule> rules) {
-            this.rules = ImmutableSet.<RelOptRule>builder()
-                    .addAll(rules)
-                    .build();
-        }
-
-        @Override
-        public Iterator<RelOptRule> iterator() {
-            return rules.iterator();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
deleted file mode 100644
index 482e841..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.collect.Lists;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormCalcRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
-    public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
-        super(cluster, traits, child, program);
-    }
-
-    @Override
-    public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
-        return new TridentCalcRel(getCluster(), traitSet, child, program);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        List<String> outputFieldNames = getRowType().getFieldNames();
-        int outputCount = outputFieldNames.size();
-
-        // filter
-        ExecutableExpression filterInstance = null;
-        RexLocalRef condition = program.getCondition();
-        if (condition != null) {
-            RexNode conditionNode = program.expandLocalRef(condition);
-            filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
-                    StormRelUtils.getClassName(this));
-        }
-
-        // projection
-        ExecutableExpression projectionInstance = null;
-        List<RexLocalRef> projectList = program.getProjectList();
-        if (projectList != null && !projectList.isEmpty()) {
-            List<RexNode> expandedNodes = new ArrayList<>();
-            for (RexLocalRef project : projectList) {
-                expandedNodes.add(program.expandLocalRef(project));
-            }
-
-            projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
-                    StormRelUtils.getClassName(this));
-        }
-
-        if (projectionInstance == null && filterInstance == null) {
-            // it shouldn't be happen
-            throw new IllegalStateException("Either projection or condition, or both should be provided.");
-        }
-
-        final Stream finalStream = inputStream
-                .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
-                .name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
deleted file mode 100644
index 1fe0927..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormFilterRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.List;
-
-public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
-    public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
-        super(cluster, traits, child, condition);
-    }
-
-    @Override
-    public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
-        return new TridentFilterRel(getCluster(), traitSet, input, condition);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        List<RexNode> childExps = getChildExps();
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        String filterClassName = StormRelUtils.getClassName(this);
-        ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
-
-        IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
-                .name(stageName);
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
deleted file mode 100644
index d221498..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-public enum TridentLogicalConvention implements Convention {
-  INSTANCE;
-
-  @Override
-  public Class getInterface() {
-    return TridentRel.class;
-  }
-
-  @Override
-  public String getName() {
-    return "STORM_LOGICAL";
-  }
-
-  @Override
-  public RelTraitDef getTraitDef() {
-    return ConventionTraitDef.INSTANCE;
-  }
-
-  @Override
-  public boolean satisfies(RelTrait trait) {
-    return this == trait;
-  }
-
-  @Override
-  public void register(RelOptPlanner planner) {}
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  @Override
-  public boolean canConvertConvention(Convention toConvention) {
-    return false;
-  }
-
-  @Override
-  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
deleted file mode 100644
index 06be5d7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormProjectRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.List;
-
-public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
-    public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-        super(cluster, traits, input, projects, rowType);
-    }
-
-    @Override
-    public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
-        return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-        String projectionClassName = StormRelUtils.getClassName(this);
-
-        List<String> outputFieldNames = getRowType().getFieldNames();
-        int outputCount = outputFieldNames.size();
-
-        List<RexNode> childExps = getChildExps();
-        RelDataType inputRowType = getInput(0).getRowType();
-
-        ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
-        Stream finalStream = inputStream
-                .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
-                .name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
deleted file mode 100644
index fa92ec9..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-
-public interface TridentRel extends StormRelNode {
-    void tridentPlan(TridentPlanCreator planCreator) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
deleted file mode 100644
index e92c29b..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.List;
-
-public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
-    public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
-        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
-    }
-
-    @Override
-    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
-                sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        // SingleRel
-        RelNode input = getInput();
-        StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
-        Stream inputStream = planCreator.pop().toStream();
-
-        String stageName = StormRelUtils.getStageName(this);
-
-        Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
-
-        List<String> inputFields = this.input.getRowType().getFieldNames();
-        List<String> outputFields = getRowType().getFieldNames();
-
-        // FIXME: this should be really different...
-        String tableName = Joiner.on('.').join(getTable().getQualifiedName());
-        ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
-
-        // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
-        IAggregatableStream finalStream = inputStream
-                .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
-                        new Fields(outputFields))
-                .newValuesStream().name(stageName);
-
-        planCreator.addStream(finalStream);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
deleted file mode 100644
index c563d73..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.Map;
-
-public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
-    private final int parallelismHint;
-
-    public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
-        super(cluster, traitSet, table);
-        this.parallelismHint = parallelismHint;
-    }
-
-    @Override
-    public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
-        String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-        // FIXME: this should be really different...
-        Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
-        if (!sources.containsKey(sourceName)) {
-            throw new RuntimeException("Cannot find table " + sourceName);
-        }
-
-        String stageName = StormRelUtils.getStageName(this);
-        IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
-                .parallelismHint(parallelismHint);
-        planCreator.addStream(finalStream);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
deleted file mode 100644
index ac35414..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentAggregateRule extends ConverterRule {
-    public static final RelOptRule INSTANCE = new TridentAggregateRule();
-
-    private TridentAggregateRule() {
-        super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
-    }
-
-    @Override
-    public RelNode convert(RelNode rel) {
-        throw new UnsupportedOperationException("Aggregate operation is not supported.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
deleted file mode 100644
index 25126ec..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.logical.LogicalCalc;
-import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentCalcRule extends ConverterRule {
-  public static final TridentCalcRule INSTANCE = new TridentCalcRule();
-
-  private TridentCalcRule() {
-    super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Calc calc = (Calc) rel;
-    final RelNode input = calc.getInput();
-
-    return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-            convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
-            calc.getProgram());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
deleted file mode 100644
index 7f9c41f..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentFilterRule extends ConverterRule {
-  public static TridentFilterRule INSTANCE = new TridentFilterRule();
-
-  private TridentFilterRule() {
-    super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Filter filter = (Filter) rel;
-    final RelNode input = filter.getInput();
-
-    return new TridentFilterRel(filter.getCluster(),
-        filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
-        filter.getCondition());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
deleted file mode 100644
index 90f5083..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentJoinRule extends ConverterRule {
-  public static final TridentJoinRule INSTANCE = new TridentJoinRule();
-
-  private TridentJoinRule() {
-    super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    throw new UnsupportedOperationException("Join operation is not supported.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
deleted file mode 100644
index 2155451..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
-
-import java.util.List;
-
-public class TridentModifyRule extends ConverterRule {
-  public static final TridentModifyRule INSTANCE = new TridentModifyRule();
-
-  private TridentModifyRule() {
-    super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableModify tableModify = (TableModify) rel;
-    final RelNode input = tableModify.getInput();
-
-    final RelOptCluster cluster = tableModify.getCluster();
-    final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
-    final RelOptTable relOptTable = tableModify.getTable();
-    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-    final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
-    final TableModify.Operation operation = tableModify.getOperation();
-    final List<String> updateColumnList = tableModify.getUpdateColumnList();
-    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
-    final boolean flattened = tableModify.isFlattened();
-
-    final Table table = tableModify.getTable().unwrap(Table.class);
-
-    switch (table.getJdbcTableType()) {
-      case STREAM:
-        if (operation != TableModify.Operation.INSERT) {
-          throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
-        }
-        return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
-            updateColumnList, sourceExpressionList, flattened);
-      default:
-        throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
deleted file mode 100644
index 2922725..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
-
-public class TridentProjectRule extends ConverterRule {
-  public static final TridentProjectRule INSTANCE = new TridentProjectRule();
-
-  private TridentProjectRule() {
-    super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
-        "TridentProjectRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Project project = (Project) rel;
-    final RelNode input = project.getInput();
-
-    return new TridentProjectRel(project.getCluster(),
-        project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(), project.getRowType());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
deleted file mode 100644
index abbd680..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
-
-public class TridentScanRule extends ConverterRule {
-  public static final TridentScanRule INSTANCE = new TridentScanRule();
-  public static final int DEFAULT_PARALLELISM_HINT = 1;
-
-  private TridentScanRule() {
-    super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableScan scan = (TableScan) rel;
-    int parallelismHint = DEFAULT_PARALLELISM_HINT;
-
-    final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
-    if (parallelTable != null && parallelTable.parallelismHint() != null) {
-      parallelismHint = parallelTable.parallelismHint();
-    }
-
-    final Table table = scan.getTable().unwrap(Table.class);
-    switch (table.getJdbcTableType()) {
-      case STREAM:
-        return new TridentStreamScanRel(scan.getCluster(),
-            scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-            scan.getTable(), parallelismHint);
-      default:
-        throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
deleted file mode 100644
index 82dc184..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class TestStormSql {
-  private static class MockDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mock";
-    }
-
-    @Override
-    public DataSource construct(
-        URI uri, String inputFormatClass, String outputFormatClass,
-        List<FieldInfo> fields) {
-      return new TestUtils.MockDataSource();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                  Properties properties, List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentDataSource();
-    }
-  }
-
-  private static class MockNestedDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mocknested";
-    }
-
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockNestedDataSource();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                  Properties properties, List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentDataSource();
-    }
-  }
-
-  private static class MockGroupDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockgroup";
-    }
-
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockGroupDataSource();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                  Properties properties, List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentGroupedDataSource();
-    }
-  }
-
-  private static class MockEmpDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockemp";
-    }
-
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockEmpDataSource();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                  Properties properties, List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentJoinDataSourceEmp();
-    }
-  }
-
-  private static class MockDeptDataSourceProvider implements DataSourcesProvider {
-    @Override
-    public String scheme() {
-      return "mockdept";
-    }
-
-    @Override
-    public DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-      return new TestUtils.MockDeptDataSource();
-    }
-
-    @Override
-    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
-                                                  Properties properties, List<FieldInfo> fields) {
-      return new TestUtils.MockSqlTridentJoinDataSourceDept();
-    }
-  }
-
-
-  @BeforeClass
-  public static void setUp() {
-    DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockgroup", new MockGroupDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockemp", new MockEmpDataSourceProvider());
-    DataSourcesRegistry.providerMap().put("mockdept", new MockDeptDataSourceProvider());
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    DataSourcesRegistry.providerMap().remove("mock");
-    DataSourcesRegistry.providerMap().remove("mocknested");
-  }
-
-  @Test
-  public void testExternalDataSource() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(4, values.get(0).get(0));
-    Assert.assertEquals(5, values.get(1).get(0));
-  }
-
-  @Test
-  public void testExternalDataSourceNested() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
-                     "FROM FOO " +
-                     "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
-    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
-  }
-
-  @Test
-  public void testExternalNestedNonExistKeyAccess() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // this triggers java.lang.RuntimeException: Cannot convert null to int
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedNonExistKeyAccess2() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // this triggers java.lang.RuntimeException: Cannot convert null to int
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedInvalidAccessStringIndexOnArray() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
-    stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
-             "FROM FOO " +
-             "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test(expected = ValidationException.class)
-  public void testExternalUdfType() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-
-  }
-
-  @Test(expected = CompilingClassLoader.CompilerException.class)
-  public void testExternalUdfType2() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    // generated code will be not compilable since return type of MYPLUS and type of 'x' are different
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(0, values.size());
-  }
-
-  @Test
-  public void testExternalUdf() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
-    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(4, values.get(0).get(0));
-    Assert.assertEquals(5, values.get(1).get(0));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testExternalUdfUsingJar() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
-    stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-  }
-
-  @Test
-  public void testGroupbyBuiltin() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(SALARY) FROM FOO GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(3, values.get(0).get(2));
-    Assert.assertEquals(12, values.get(1).get(2));
-    Assert.assertEquals(21, values.get(2).get(2));
-    Assert.assertEquals(9, values.get(3).get(2));
-  }
-
-  @Test
-  public void testGroupbyBuiltinWithFilter() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(3L, values.get(0).get(1));
-    Assert.assertEquals(12, values.get(0).get(2));
-    Assert.assertEquals(2.5, values.get(0).get(3));
-  }
-
-  @Test
-  public void testGroupbyBuiltinAndUDF() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("CREATE FUNCTION MYCONCAT AS 'org.apache.storm.sql.TestUtils$MyConcat'");
-    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
-    stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) FROM FOO GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(3, values.get(0).get(1));
-    Assert.assertEquals("xxx", values.get(0).get(2));
-    Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
-    Assert.assertEquals(12, values.get(1).get(1));
-    Assert.assertEquals("xxx", values.get(1).get(2));
-    Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
-    Assert.assertEquals(21, values.get(2).get(1));
-    Assert.assertEquals("xxx", values.get(2).get(2));
-    Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
-    Assert.assertEquals(9, values.get(3).get(1));
-    Assert.assertEquals("x", values.get(3).get(2));
-    Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
-  }
-
-  @Test
-  public void testAggFnNonSqlReturnType() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
-    stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID >= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2));
-    Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2));
-    Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2));
-    Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2));
-  }
-
-  @Test
-  public void testGroupbySameAggregateOnDifferentColumns() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), AVG(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(3L, values.get(0).get(1));
-    Assert.assertEquals(4, values.get(0).get(2));
-    Assert.assertEquals(2.5, values.get(0).get(3));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testGroupbyBuiltinNotimplemented() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), STDDEV_POP(SALARY) FROM FOO GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-  }
-
-  @Test
-  public void testMinMax() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY (ID)");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(4, values.size());
-    Assert.assertEquals(0, values.get(0).get(2));
-    Assert.assertEquals(3, values.get(1).get(2));
-    Assert.assertEquals(6, values.get(2).get(2));
-    Assert.assertEquals(9, values.get(3).get(2));
-
-    Assert.assertEquals(1.5, values.get(0).get(3));
-    Assert.assertEquals(3.0, values.get(1).get(3));
-    Assert.assertEquals(4.5, values.get(2).get(3));
-    Assert.assertEquals(5.0, values.get(3).get(3));
-  }
-  @Test
-  public void testFilterGroupbyHaving() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM ID, MIN(SALARY) FROM FOO where ID > 0 GROUP BY (ID) HAVING ID > 2 AND MAX(SALARY) > 5");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(1, values.size());
-    Assert.assertEquals(3, values.get(0).get(0));
-    Assert.assertEquals(9, values.get(0).get(1));
-  }
-
-  @Test
-  public void testGroupByMultipleFields() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE FOO (DEPTID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR, EMPID INT) LOCATION 'mockgroup:///foo'");
-    stmt.add("SELECT STREAM DEPTID, EMPID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY DEPTID, EMPID");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(7, values.size());
-    Assert.assertEquals(0, values.get(0).get(0));
-    Assert.assertEquals(0, values.get(0).get(1));
-    Assert.assertEquals(2L, values.get(0).get(2));
-  }
-
-  @Test
-  public void testjoin() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
-    stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
-    stmt.add("SELECT STREAM EMPID, EMPNAME, DEPTNAME FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    System.out.println(values);
-    Assert.assertEquals(3, values.size());
-    Assert.assertEquals("emp1", values.get(0).get(1));
-    Assert.assertEquals("dept1", values.get(0).get(2));
-    Assert.assertEquals("emp2", values.get(1).get(1));
-    Assert.assertEquals("dept1", values.get(1).get(2));
-    Assert.assertEquals("emp3", values.get(2).get(1));
-    Assert.assertEquals("dept2", values.get(2).get(2));
-  }
-
-  @Test
-  public void testjoinAndGroupby() throws Exception {
-    List<String> stmt = new ArrayList<>();
-    stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
-    stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
-    stmt.add("SELECT STREAM d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0" +
-                     "GROUP BY d.DEPTID");
-    StormSql sql = StormSql.construct();
-    List<Values> values = new ArrayList<>();
-    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
-    sql.execute(stmt, h);
-    Assert.assertEquals(2, values.size());
-    Assert.assertEquals(1, values.get(0).get(0));
-    Assert.assertEquals(2L, values.get(0).get(1));
-    Assert.assertEquals(2, values.get(1).get(0));
-    Assert.assertEquals(1L, values.get(1).get(1));
-  }
-}