You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:18 UTC
[15/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
deleted file mode 100644
index e146069..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.storm.sql.planner.trident;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.FilterCalcMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterToCalcRule;
-import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.UnionEliminatorRule;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
-import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
-import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
-import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
-import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
-import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
-import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
-
-import java.util.Iterator;
-
-public class TridentStormRuleSets {
- private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
- ImmutableSet.<RelOptRule>builder().add(
- SortRemoveRule.INSTANCE,
-
- FilterToCalcRule.INSTANCE,
- ProjectToCalcRule.INSTANCE,
- FilterCalcMergeRule.INSTANCE,
- ProjectCalcMergeRule.INSTANCE,
- CalcMergeRule.INSTANCE,
-
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
-
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- ProjectRemoveRule.INSTANCE,
-
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
-
- // merge and push unions rules
- UnionEliminatorRule.INSTANCE,
-
- TridentScanRule.INSTANCE,
- TridentFilterRule.INSTANCE,
- TridentProjectRule.INSTANCE,
- TridentAggregateRule.INSTANCE,
- TridentJoinRule.INSTANCE,
- TridentModifyRule.INSTANCE,
- TridentCalcRule.INSTANCE
- ).build();
-
- public static RuleSet[] getRuleSets() {
- return new RuleSet[]{
- new StormRuleSet(StreamRules.RULES),
- new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
- };
- }
-
- private static class StormRuleSet implements RuleSet {
- final ImmutableSet<RelOptRule> rules;
-
- public StormRuleSet(ImmutableSet<RelOptRule> rules) {
- this.rules = rules;
- }
-
- public StormRuleSet(ImmutableList<RelOptRule> rules) {
- this.rules = ImmutableSet.<RelOptRule>builder()
- .addAll(rules)
- .build();
- }
-
- @Override
- public Iterator<RelOptRule> iterator() {
- return rules.iterator();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
deleted file mode 100644
index 482e841..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.collect.Lists;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormCalcRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
- public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
- super(cluster, traits, child, program);
- }
-
- @Override
- public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
- return new TridentCalcRel(getCluster(), traitSet, child, program);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- RelDataType inputRowType = getInput(0).getRowType();
-
- List<String> outputFieldNames = getRowType().getFieldNames();
- int outputCount = outputFieldNames.size();
-
- // filter
- ExecutableExpression filterInstance = null;
- RexLocalRef condition = program.getCondition();
- if (condition != null) {
- RexNode conditionNode = program.expandLocalRef(condition);
- filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
- StormRelUtils.getClassName(this));
- }
-
- // projection
- ExecutableExpression projectionInstance = null;
- List<RexLocalRef> projectList = program.getProjectList();
- if (projectList != null && !projectList.isEmpty()) {
- List<RexNode> expandedNodes = new ArrayList<>();
- for (RexLocalRef project : projectList) {
- expandedNodes.add(program.expandLocalRef(project));
- }
-
- projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
- StormRelUtils.getClassName(this));
- }
-
- if (projectionInstance == null && filterInstance == null) {
- // it shouldn't be happen
- throw new IllegalStateException("Either projection or condition, or both should be provided.");
- }
-
- final Stream finalStream = inputStream
- .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
- .name(stageName);
-
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
deleted file mode 100644
index 1fe0927..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormFilterRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.List;
-
-public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
- public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override
- public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new TridentFilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- List<RexNode> childExps = getChildExps();
- RelDataType inputRowType = getInput(0).getRowType();
-
- String filterClassName = StormRelUtils.getClassName(this);
- ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
-
- IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
- .name(stageName);
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
deleted file mode 100644
index d221498..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-public enum TridentLogicalConvention implements Convention {
- INSTANCE;
-
- @Override
- public Class getInterface() {
- return TridentRel.class;
- }
-
- @Override
- public String getName() {
- return "STORM_LOGICAL";
- }
-
- @Override
- public RelTraitDef getTraitDef() {
- return ConventionTraitDef.INSTANCE;
- }
-
- @Override
- public boolean satisfies(RelTrait trait) {
- return this == trait;
- }
-
- @Override
- public void register(RelOptPlanner planner) {}
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public boolean canConvertConvention(Convention toConvention) {
- return false;
- }
-
- @Override
- public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
deleted file mode 100644
index 06be5d7..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormProjectRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.List;
-
-public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
- public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override
- public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
- return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
- String projectionClassName = StormRelUtils.getClassName(this);
-
- List<String> outputFieldNames = getRowType().getFieldNames();
- int outputCount = outputFieldNames.size();
-
- List<RexNode> childExps = getChildExps();
- RelDataType inputRowType = getInput(0).getRowType();
-
- ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
- Stream finalStream = inputStream
- .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
- .name(stageName);
-
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
deleted file mode 100644
index fa92ec9..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.storm.sql.planner.rel.StormRelNode;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-
-public interface TridentRel extends StormRelNode {
- void tridentPlan(TridentPlanCreator planCreator) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
deleted file mode 100644
index e92c29b..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-import org.apache.storm.tuple.Fields;
-
-import java.util.List;
-
-public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
- public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
- }
-
- @Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
- sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
-
- List<String> inputFields = this.input.getRowType().getFieldNames();
- List<String> outputFields = getRowType().getFieldNames();
-
- // FIXME: this should be really different...
- String tableName = Joiner.on('.').join(getTable().getQualifiedName());
- ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
-
- // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
- IAggregatableStream finalStream = inputStream
- .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
- new Fields(outputFields))
- .newValuesStream().name(stageName);
-
- planCreator.addStream(finalStream);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
deleted file mode 100644
index c563d73..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-import java.util.Map;
-
-public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
- private final int parallelismHint;
-
- public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
- super(cluster, traitSet, table);
- this.parallelismHint = parallelismHint;
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- // FIXME: this should be really different...
- Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
- if (!sources.containsKey(sourceName)) {
- throw new RuntimeException("Cannot find table " + sourceName);
- }
-
- String stageName = StormRelUtils.getStageName(this);
- IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
- .parallelismHint(parallelismHint);
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
deleted file mode 100644
index ac35414..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentAggregateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new TridentAggregateRule();
-
- private TridentAggregateRule() {
- super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- throw new UnsupportedOperationException("Aggregate operation is not supported.");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
deleted file mode 100644
index 25126ec..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.logical.LogicalCalc;
-import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentCalcRule extends ConverterRule {
- public static final TridentCalcRule INSTANCE = new TridentCalcRule();
-
- private TridentCalcRule() {
- super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Calc calc = (Calc) rel;
- final RelNode input = calc.getInput();
-
- return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
- calc.getProgram());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
deleted file mode 100644
index 7f9c41f..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentFilterRule extends ConverterRule {
- public static TridentFilterRule INSTANCE = new TridentFilterRule();
-
- private TridentFilterRule() {
- super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Filter filter = (Filter) rel;
- final RelNode input = filter.getInput();
-
- return new TridentFilterRel(filter.getCluster(),
- filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
- filter.getCondition());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
deleted file mode 100644
index 90f5083..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentJoinRule extends ConverterRule {
- public static final TridentJoinRule INSTANCE = new TridentJoinRule();
-
- private TridentJoinRule() {
- super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- throw new UnsupportedOperationException("Join operation is not supported.");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
deleted file mode 100644
index 2155451..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
-
-import java.util.List;
-
-public class TridentModifyRule extends ConverterRule {
- public static final TridentModifyRule INSTANCE = new TridentModifyRule();
-
- private TridentModifyRule() {
- super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
- }
- return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
- updateColumnList, sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
deleted file mode 100644
index 2922725..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
-
-public class TridentProjectRule extends ConverterRule {
- public static final TridentProjectRule INSTANCE = new TridentProjectRule();
-
- private TridentProjectRule() {
- super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
- "TridentProjectRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Project project = (Project) rel;
- final RelNode input = project.getInput();
-
- return new TridentProjectRel(project.getCluster(),
- project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(), project.getRowType());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
deleted file mode 100644
index abbd680..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
-
-public class TridentScanRule extends ConverterRule {
- public static final TridentScanRule INSTANCE = new TridentScanRule();
- public static final int DEFAULT_PARALLELISM_HINT = 1;
-
- private TridentScanRule() {
- super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
- int parallelismHint = DEFAULT_PARALLELISM_HINT;
-
- final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
- if (parallelTable != null && parallelTable.parallelismHint() != null) {
- parallelismHint = parallelTable.parallelismHint();
- }
-
- final Table table = scan.getTable().unwrap(Table.class);
- switch (table.getJdbcTableType()) {
- case STREAM:
- return new TridentStreamScanRel(scan.getCluster(),
- scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- scan.getTable(), parallelismHint);
- default:
- throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
deleted file mode 100644
index 82dc184..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.sql;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.DataSourcesProvider;
-import org.apache.storm.sql.runtime.DataSourcesRegistry;
-import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class TestStormSql {
- private static class MockDataSourceProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "mock";
- }
-
- @Override
- public DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- return new TestUtils.MockDataSource();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentDataSource();
- }
- }
-
- private static class MockNestedDataSourceProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "mocknested";
- }
-
- @Override
- public DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- return new TestUtils.MockNestedDataSource();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentDataSource();
- }
- }
-
- private static class MockGroupDataSourceProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "mockgroup";
- }
-
- @Override
- public DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- return new TestUtils.MockGroupDataSource();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentGroupedDataSource();
- }
- }
-
- private static class MockEmpDataSourceProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "mockemp";
- }
-
- @Override
- public DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- return new TestUtils.MockEmpDataSource();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentJoinDataSourceEmp();
- }
- }
-
- private static class MockDeptDataSourceProvider implements DataSourcesProvider {
- @Override
- public String scheme() {
- return "mockdept";
- }
-
- @Override
- public DataSource construct(
- URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- return new TestUtils.MockDeptDataSource();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentJoinDataSourceDept();
- }
- }
-
-
- @BeforeClass
- public static void setUp() {
- DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
- DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
- DataSourcesRegistry.providerMap().put("mockgroup", new MockGroupDataSourceProvider());
- DataSourcesRegistry.providerMap().put("mockemp", new MockEmpDataSourceProvider());
- DataSourcesRegistry.providerMap().put("mockdept", new MockDeptDataSourceProvider());
- }
-
- @AfterClass
- public static void tearDown() {
- DataSourcesRegistry.providerMap().remove("mock");
- DataSourcesRegistry.providerMap().remove("mocknested");
- }
-
- @Test
- public void testExternalDataSource() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(2, values.size());
- Assert.assertEquals(4, values.get(0).get(0));
- Assert.assertEquals(5, values.get(1).get(0));
- }
-
- @Test
- public void testExternalDataSourceNested() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- System.out.println(values);
- Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
- Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
- Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
- }
-
- @Test
- public void testExternalNestedNonExistKeyAccess() throws Exception {
- List<String> stmt = new ArrayList<>();
- // this triggers java.lang.RuntimeException: Cannot convert null to int
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(0, values.size());
- }
-
- @Test
- public void testExternalNestedNonExistKeyAccess2() throws Exception {
- List<String> stmt = new ArrayList<>();
- // this triggers java.lang.RuntimeException: Cannot convert null to int
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(0, values.size());
- }
-
- @Test
- public void testExternalNestedInvalidAccessStringIndexOnArray() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(0, values.size());
- }
-
- @Test
- public void testExternalNestedArrayOutOfBoundAccess() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("SELECT STREAM ID, MAPFIELD, NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(0, values.size());
- }
-
- @Test(expected = ValidationException.class)
- public void testExternalUdfType() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
- stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
- stmt.add("SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- System.out.println(values);
-
- }
-
- @Test(expected = CompilingClassLoader.CompilerException.class)
- public void testExternalUdfType2() throws Exception {
- List<String> stmt = new ArrayList<>();
- // generated code will be not compilable since return type of MYPLUS and type of 'x' are different
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
- stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
- stmt.add("SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(0, values.size());
- }
-
- @Test
- public void testExternalUdf() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
- stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(2, values.size());
- Assert.assertEquals(4, values.get(0).get(0));
- Assert.assertEquals(5, values.get(1).get(0));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testExternalUdfUsingJar() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
- stmt.add("SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- }
-
- @Test
- public void testGroupbyBuiltin() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(SALARY) FROM FOO GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(4, values.size());
- Assert.assertEquals(3, values.get(0).get(2));
- Assert.assertEquals(12, values.get(1).get(2));
- Assert.assertEquals(21, values.get(2).get(2));
- Assert.assertEquals(9, values.get(3).get(2));
- }
-
- @Test
- public void testGroupbyBuiltinWithFilter() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, COUNT(*), SUM(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(1, values.size());
- Assert.assertEquals(1, values.get(0).get(0));
- Assert.assertEquals(3L, values.get(0).get(1));
- Assert.assertEquals(12, values.get(0).get(2));
- Assert.assertEquals(2.5, values.get(0).get(3));
- }
-
- @Test
- public void testGroupbyBuiltinAndUDF() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("CREATE FUNCTION MYCONCAT AS 'org.apache.storm.sql.TestUtils$MyConcat'");
- stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
- stmt.add("SELECT STREAM ID, SUM(SALARY), MYCONCAT(NAME), TOPN(2, SALARY) FROM FOO GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(4, values.size());
- Assert.assertEquals(3, values.get(0).get(1));
- Assert.assertEquals("xxx", values.get(0).get(2));
- Assert.assertEquals(Arrays.asList(2, 1), values.get(0).get(3));
- Assert.assertEquals(12, values.get(1).get(1));
- Assert.assertEquals("xxx", values.get(1).get(2));
- Assert.assertEquals(Arrays.asList(5, 4), values.get(1).get(3));
- Assert.assertEquals(21, values.get(2).get(1));
- Assert.assertEquals("xxx", values.get(2).get(2));
- Assert.assertEquals(Arrays.asList(8, 7), values.get(2).get(3));
- Assert.assertEquals(9, values.get(3).get(1));
- Assert.assertEquals("x", values.get(3).get(2));
- Assert.assertEquals(Arrays.asList(9), values.get(3).get(3));
- }
-
- @Test
- public void testAggFnNonSqlReturnType() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
- stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID >= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(4, values.size());
- Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2));
- Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2));
- Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2));
- Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2));
- }
-
- @Test
- public void testGroupbySameAggregateOnDifferentColumns() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, COUNT(*), AVG(SALARY), AVG(PCT) FROM FOO WHERE ID = 1 GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(1, values.size());
- Assert.assertEquals(1, values.get(0).get(0));
- Assert.assertEquals(3L, values.get(0).get(1));
- Assert.assertEquals(4, values.get(0).get(2));
- Assert.assertEquals(2.5, values.get(0).get(3));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGroupbyBuiltinNotimplemented() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, COUNT(*), STDDEV_POP(SALARY) FROM FOO GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- }
-
- @Test
- public void testMinMax() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY (ID)");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(4, values.size());
- Assert.assertEquals(0, values.get(0).get(2));
- Assert.assertEquals(3, values.get(1).get(2));
- Assert.assertEquals(6, values.get(2).get(2));
- Assert.assertEquals(9, values.get(3).get(2));
-
- Assert.assertEquals(1.5, values.get(0).get(3));
- Assert.assertEquals(3.0, values.get(1).get(3));
- Assert.assertEquals(4.5, values.get(2).get(3));
- Assert.assertEquals(5.0, values.get(3).get(3));
- }
- @Test
- public void testFilterGroupbyHaving() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM ID, MIN(SALARY) FROM FOO where ID > 0 GROUP BY (ID) HAVING ID > 2 AND MAX(SALARY) > 5");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(1, values.size());
- Assert.assertEquals(3, values.get(0).get(0));
- Assert.assertEquals(9, values.get(0).get(1));
- }
-
- @Test
- public void testGroupByMultipleFields() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (DEPTID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR, EMPID INT) LOCATION 'mockgroup:///foo'");
- stmt.add("SELECT STREAM DEPTID, EMPID, COUNT(*), MIN(SALARY), MAX(PCT) FROM FOO GROUP BY DEPTID, EMPID");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(7, values.size());
- Assert.assertEquals(0, values.get(0).get(0));
- Assert.assertEquals(0, values.get(0).get(1));
- Assert.assertEquals(2L, values.get(0).get(2));
- }
-
- @Test
- public void testjoin() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
- stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
- stmt.add("SELECT STREAM EMPID, EMPNAME, DEPTNAME FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- System.out.println(values);
- Assert.assertEquals(3, values.size());
- Assert.assertEquals("emp1", values.get(0).get(1));
- Assert.assertEquals("dept1", values.get(0).get(2));
- Assert.assertEquals("emp2", values.get(1).get(1));
- Assert.assertEquals("dept1", values.get(1).get(2));
- Assert.assertEquals("emp3", values.get(2).get(1));
- Assert.assertEquals("dept2", values.get(2).get(2));
- }
-
- @Test
- public void testjoinAndGroupby() throws Exception {
- List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE EMP (EMPID INT PRIMARY KEY, EMPNAME VARCHAR, DEPTID INT) LOCATION 'mockemp:///foo'");
- stmt.add("CREATE EXTERNAL TABLE DEPT (DEPTID INT PRIMARY KEY, DEPTNAME VARCHAR) LOCATION 'mockdept:///foo'");
- stmt.add("SELECT STREAM d.DEPTID, count(EMPID) FROM EMP AS e JOIN DEPT AS d ON e.DEPTID = d.DEPTID WHERE e.empid > 0" +
- "GROUP BY d.DEPTID");
- StormSql sql = StormSql.construct();
- List<Values> values = new ArrayList<>();
- ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
- sql.execute(stmt, h);
- Assert.assertEquals(2, values.size());
- Assert.assertEquals(1, values.get(0).get(0));
- Assert.assertEquals(2L, values.get(0).get(1));
- Assert.assertEquals(2, values.get(1).get(0));
- Assert.assertEquals(1L, values.get(1).get(1));
- }
-}