You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:46 UTC
[4/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API
to Streams API
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
deleted file mode 100644
index 28256e8..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-public enum TridentLogicalConvention implements Convention {
- INSTANCE;
-
- @Override
- public Class getInterface() {
- return TridentRel.class;
- }
-
- @Override
- public String getName() {
- return "STORM_LOGICAL";
- }
-
- @Override
- public RelTraitDef getTraitDef() {
- return ConventionTraitDef.INSTANCE;
- }
-
- @Override
- public boolean satisfies(RelTrait trait) {
- return this == trait;
- }
-
- @Override
- public void register(RelOptPlanner planner) {}
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public boolean canConvertConvention(Convention toConvention) {
- return false;
- }
-
- @Override
- public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
deleted file mode 100644
index 1eef647..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormProjectRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
- public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects,
- RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override
- public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
- return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
- String projectionClassName = StormRelUtils.getClassName(this);
-
- List<String> outputFieldNames = getRowType().getFieldNames();
- int outputCount = outputFieldNames.size();
-
- List<RexNode> childExps = getChildExps();
- RelDataType inputRowType = getInput(0).getRowType();
-
- ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
- Stream finalStream = inputStream
- .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
- .name(stageName);
-
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
deleted file mode 100644
index 6e90997..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import org.apache.storm.sql.planner.rel.StormRelNode;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-
-public interface TridentRel extends StormRelNode {
- void tridentPlan(TridentPlanCreator planCreator) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
deleted file mode 100644
index 28e43bd..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
- public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader,
- RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList,
- boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
- }
-
- @Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
- sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
-
- List<String> inputFields = this.input.getRowType().getFieldNames();
- List<String> outputFields = getRowType().getFieldNames();
-
- // FIXME: this should be really different...
- String tableName = Joiner.on('.').join(getTable().getQualifiedName());
- ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
-
- // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
- IAggregatableStream finalStream = inputStream
- .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
- new Fields(outputFields))
- .newValuesStream().name(stageName);
-
- planCreator.addStream(finalStream);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
deleted file mode 100644
index 706cdf5..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.base.Joiner;
-import java.util.Map;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
- private final int parallelismHint;
-
- public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
- super(cluster, traitSet, table);
- this.parallelismHint = parallelismHint;
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- // FIXME: this should be really different...
- Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
- if (!sources.containsKey(sourceName)) {
- throw new RuntimeException("Cannot find table " + sourceName);
- }
-
- String stageName = StormRelUtils.getStageName(this);
- IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
- .parallelismHint(parallelismHint);
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
deleted file mode 100644
index 316a727..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentAggregateRule extends ConverterRule {
- public static final RelOptRule INSTANCE = new TridentAggregateRule();
-
- private TridentAggregateRule() {
- super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- throw new UnsupportedOperationException("Aggregate operation is not supported.");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
deleted file mode 100644
index b3709b7..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.logical.LogicalCalc;
-import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentCalcRule extends ConverterRule {
- public static final TridentCalcRule INSTANCE = new TridentCalcRule();
-
- private TridentCalcRule() {
- super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Calc calc = (Calc) rel;
- final RelNode input = calc.getInput();
-
- return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
- calc.getProgram());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
deleted file mode 100644
index 7678301..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentFilterRule extends ConverterRule {
- public static TridentFilterRule INSTANCE = new TridentFilterRule();
-
- private TridentFilterRule() {
- super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Filter filter = (Filter) rel;
- final RelNode input = filter.getInput();
-
- return new TridentFilterRel(filter.getCluster(),
- filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
- filter.getCondition());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
deleted file mode 100644
index 305f315..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-
-public class TridentJoinRule extends ConverterRule {
- public static final TridentJoinRule INSTANCE = new TridentJoinRule();
-
- private TridentJoinRule() {
- super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- throw new UnsupportedOperationException("Join operation is not supported.");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
deleted file mode 100644
index 6848a5e..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import java.util.List;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
-
-public class TridentModifyRule extends ConverterRule {
- public static final TridentModifyRule INSTANCE = new TridentModifyRule();
-
- private TridentModifyRule() {
- super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
- }
- return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
- updateColumnList, sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
deleted file mode 100644
index 4e415e0..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
-
-public class TridentProjectRule extends ConverterRule {
- public static final TridentProjectRule INSTANCE = new TridentProjectRule();
-
- private TridentProjectRule() {
- super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
- "TridentProjectRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Project project = (Project) rel;
- final RelNode input = project.getInput();
-
- return new TridentProjectRel(project.getCluster(),
- project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(),
- project.getRowType());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
deleted file mode 100644
index a6ff965..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rules;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.schema.Table;
-import org.apache.storm.sql.calcite.ParallelStreamableTable;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
-
-public class TridentScanRule extends ConverterRule {
- public static final TridentScanRule INSTANCE = new TridentScanRule();
- public static final int DEFAULT_PARALLELISM_HINT = 1;
-
- private TridentScanRule() {
- super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
- int parallelismHint = DEFAULT_PARALLELISM_HINT;
-
- final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
- if (parallelTable != null && parallelTable.parallelismHint() != null) {
- parallelismHint = parallelTable.parallelismHint();
- }
-
- final Table table = scan.getTable().unwrap(Table.class);
- switch (table.getJdbcTableType()) {
- case STREAM:
- return new TridentStreamScanRel(scan.getCluster(),
- scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
- scan.getTable(), parallelismHint);
- default:
- throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
index 4d90cba..8cc713a 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/SqlTestUtil.java
@@ -19,34 +19,29 @@
package org.apache.storm.sql;
+import java.util.List;
import java.util.concurrent.Callable;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.trident.TridentTopology;
import org.apache.storm.utils.Utils;
-import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
-
public final class SqlTestUtil {
- public static void runTridentTopology(LocalCluster cluster, final int expectedValueSize, AbstractTridentProcessor proc,
- TridentTopology topo) throws Exception {
+ public static void runStormTopology(LocalCluster cluster, final List<?> watchedList, final int expectedValueSize,
+ AbstractStreamsProcessor proc, StormTopology topo) throws Exception {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
+ conf.setDebug(true);
if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
}
- try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) {
- waitForCompletion(1000 * 1000, new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return getCollectedValues().size() < expectedValueSize;
- }
- });
+ try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo)) {
+ waitForCompletion(1000 * 1000, () -> watchedList.size() < expectedValueSize);
} finally {
while (cluster.getClusterInfo().get_topologies_size() > 0) {
Thread.sleep(10);
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
index c7b0743..450e4f4 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/StormSqlLocalClusterImpl.java
@@ -23,11 +23,11 @@ import java.util.function.Predicate;
import org.apache.calcite.sql.SqlNode;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.trident.TridentTopology;
import org.apache.storm.utils.Utils;
public class StormSqlLocalClusterImpl {
@@ -57,15 +57,15 @@ public class StormSqlLocalClusterImpl {
} else if (node instanceof SqlCreateFunction) {
sqlContext.interpretCreateFunction((SqlCreateFunction) node);
} else {
- AbstractTridentProcessor processor = sqlContext.compileSql(sql);
- TridentTopology topo = processor.build();
+ AbstractStreamsProcessor processor = sqlContext.compileSql(sql);
+ StormTopology topo = processor.build();
if (processor.getClassLoaders() != null && processor.getClassLoaders().size() > 0) {
CompilingClassLoader lastClassloader = processor.getClassLoaders().get(processor.getClassLoaders().size() - 1);
Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
}
- try (LocalCluster.LocalTopology stormTopo = localCluster.submitTopology("storm-sql", conf, topo.build())) {
+ try (LocalCluster.LocalTopology stormTopo = localCluster.submitTopology("storm-sql", conf, topo)) {
waitForCompletion(waitTimeoutMs, waitCondition);
} finally {
while (localCluster.getClusterInfo().get_topologies_size() > 0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 468a5dc..5a30764 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -12,34 +12,42 @@
package org.apache.storm.sql;
-import com.google.common.collect.ImmutableMap;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.calcite.tools.ValidationException;
import org.apache.storm.LocalCluster;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.DataSourcesRegistry;
import org.apache.storm.sql.runtime.FieldInfo;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
import org.apache.storm.tuple.Values;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
-
-import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+import org.junit.rules.ExternalResource;
public class TestStormSql {
public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
+
+ @Rule
+ public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
+
+ @Rule
+ public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+
private static LocalCluster cluster;
@BeforeClass
@@ -67,43 +75,39 @@ public class TestStormSql {
}
}
- @Before
- public void setUp() {
- getCollectedValues().clear();
- }
-
@Test
public void testExternalDataSource() throws Exception {
List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
stmt.add("INSERT INTO BAR SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, WAIT_TIMEOUT_MS);
Assert.assertEquals(2, values.size());
- Assert.assertEquals(4, values.get(0).get(0));
- Assert.assertEquals(5, values.get(1).get(0));
+ Assert.assertEquals(4, values.get(0).getFirst());
+ Assert.assertEquals(5, values.get(1).getFirst());
}
@Test
public void testExternalDataSourceNested() throws Exception {
List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
- stmt.add("CREATE EXTERNAL TABLE BAR (ID INT, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY, MAPFIELD ANY, NESTEDMAPFIELD ANY, ARRAYFIELD ANY) LOCATION 'mocknested:///foo'");
stmt.add("INSERT INTO BAR SELECT STREAM ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
"FROM FOO " +
"WHERE CAST(MAPFIELD['b'] AS INTEGER) = 2 AND CAST(ARRAYFIELD[2] AS INTEGER) = 200");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();;
impl.runLocal(cluster, stmt, (__) -> values.size() >= 1, WAIT_TIMEOUT_MS);
Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
- Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0));
+ Assert.assertEquals(2, values.get(0).getFirst());
+ Assert.assertEquals(new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300)), values.get(0).getSecond());
}
@Test
@@ -117,7 +121,7 @@ public class TestStormSql {
"WHERE CAST(MAPFIELD['a'] AS INTEGER) = 2");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
Assert.assertEquals(0, values.size());
@@ -134,7 +138,7 @@ public class TestStormSql {
"WHERE CAST(NESTEDMAPFIELD['b']['c'] AS INTEGER) = 4");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
Assert.assertEquals(0, values.size());
@@ -150,7 +154,7 @@ public class TestStormSql {
"WHERE CAST(ARRAYFIELD['a'] AS INTEGER) = 200");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
Assert.assertEquals(0, values.size());
@@ -166,7 +170,7 @@ public class TestStormSql {
"WHERE CAST(ARRAYFIELD[10] AS INTEGER) = 200");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED);
Assert.assertEquals(0, values.size());
@@ -204,25 +208,25 @@ public class TestStormSql {
@Test
public void testExternalUdf() throws Exception {
List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'");
stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- List<List<Object>> values = getCollectedValues();
+ List<Pair<Object, Values>> values = TestUtils.MockInsertBolt.getCollectedValues();
impl.runLocal(cluster, stmt, (__) -> values.size() >= 2, WAIT_TIMEOUT_MS);
Assert.assertEquals(2, values.size());
- Assert.assertEquals(4, values.get(0).get(0));
- Assert.assertEquals(5, values.get(1).get(0));
+ Assert.assertEquals(4, values.get(0).getFirst());
+ Assert.assertEquals(5, values.get(1).getFirst());
}
@Test(expected = UnsupportedOperationException.class)
public void testExternalUdfUsingJar() throws Exception {
List<String> stmt = new ArrayList<>();
- stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("CREATE EXTERNAL TABLE BAR (ID INT) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
+ stmt.add("CREATE EXTERNAL TABLE BAR (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
stmt.add("CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' USING JAR 'foo'");
stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
@@ -239,9 +243,9 @@ public class TestStormSql {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlExprDataSource();
+ return new TestUtils.MockSqlStreamsInsertDataSource();
}
}
@@ -252,9 +256,9 @@ public class TestStormSql {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentNestedDataSource();
+ return new TestUtils.MockSqlStreamsInsertNestedDataSource();
}
}
@@ -265,9 +269,9 @@ public class TestStormSql {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentGroupedDataSource();
+ return new TestUtils.MockSqlStreamsInsertGroupedDataSource();
}
}
@@ -278,9 +282,9 @@ public class TestStormSql {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentJoinDataSourceEmp();
+ return new TestUtils.MockSqlStreamsInsertJoinDataSourceEmp();
}
}
@@ -291,9 +295,9 @@ public class TestStormSql {
}
@Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
- return new TestUtils.MockSqlTridentJoinDataSourceDept();
+ return new TestUtils.MockSqlStreamsInsertJoinDataSourceDept();
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
new file mode 100644
index 0000000..b7fb5b3
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestCompilerUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.compiler.backends.streams;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.streams.QueryPlanner;
+import org.apache.storm.sql.planner.streams.rel.StreamsRel;
+
+public class TestCompilerUtils {
+
+ public static CalciteState sqlOverDummyTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ StreamsRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverDummyGroupByTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+ .field("GRPID", SqlTypeName.INTEGER)
+ .field("NAME", typeFactory.createType(String.class))
+ .field("ADDR", typeFactory.createType(String.class))
+ .field("AGE", SqlTypeName.INTEGER)
+ .field("SCORE", SqlTypeName.INTEGER)
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
+ schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ StreamsRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverNestedTable(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+ .field("MAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ .field("NESTEDMAPFIELD",
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createMapType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true))
+ , true))
+ , true))
+ .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
+ typeFactory.createArrayType(
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
+ , true))
+ .build();
+ Table table = streamableTable.stream();
+ schema.add("FOO", table);
+ schema.add("BAR", table);
+ schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ StreamsRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
+ throws RelConversionException, ValidationException, SqlParseException {
+ SchemaPlus schema = Frameworks.createRootSchema(true);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("EMPID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+ .field("EMPNAME", SqlTypeName.VARCHAR)
+ .field("DEPTID", SqlTypeName.INTEGER)
+ .build();
+
+ StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("DEPTID", SqlTypeName.INTEGER, new ColumnConstraint.PrimaryKey(SqlMonotonicity.MONOTONIC, SqlParserPos.ZERO))
+ .field("DEPTNAME", SqlTypeName.VARCHAR)
+ .build();
+
+ Table table = streamableTable.stream();
+ Table table2 = streamableTable2.stream();
+ schema.add("EMP", table);
+ schema.add("DEPT", table2);
+
+ QueryPlanner queryPlanner = new QueryPlanner(schema);
+ StreamsRel tree = queryPlanner.getPlan(sql);
+ System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
+ return new CalciteState(schema, tree);
+ }
+
+ public static class MyPlus {
+ public static Integer eval(Integer x, Integer y) {
+ return x + y;
+ }
+ }
+
+ public static class MyStaticSumFunction {
+ public static long init() {
+ return 0L;
+ }
+
+ public static long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+ }
+
+ public static class MySumFunction {
+ public MySumFunction() {
+ }
+
+ public long init() {
+ return 0L;
+ }
+
+ public long add(long accumulator, int v) {
+ return accumulator + v;
+ }
+
+ public long result(long accumulator) {
+ return accumulator;
+ }
+ }
+
+ public static class CalciteState {
+ final SchemaPlus schema;
+ final RelNode tree;
+
+ private CalciteState(SchemaPlus schema, RelNode tree) {
+ this.schema = schema;
+ this.tree = tree;
+ }
+
+ public SchemaPlus schema() {
+ return schema;
+ }
+
+ public RelNode tree() {
+ return tree;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
new file mode 100644
index 0000000..68203d8
--- /dev/null
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
@@ -0,0 +1,235 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.streams;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.sql.SqlTestUtil;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.AbstractStreamsProcessor;
+import org.apache.storm.sql.planner.streams.QueryPlanner;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.tuple.Values;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestPlanCompiler {
+ private static LocalCluster cluster;
+
+ @Rule
+ public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
+
+ @Rule
+ public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+
+ @BeforeClass
+ public static void staticSetup() throws Exception {
+ cluster = new LocalCluster();
+ }
+
+ @AfterClass
+ public static void staticCleanup() {
+ if (cluster!= null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test
+ public void testCompile() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 2;
+ String sql = "SELECT ID FROM FOO WHERE ID > 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ // inject output bolt
+ proc.outputStream().to(new TestUtils.MockBolt());
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 1;
+ String sql = "INSERT INTO BAR SELECT ID, NAME, ADDR FROM FOO WHERE ID > 3";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+ data.put("BAR", new TestUtils.MockSqlStreamsOutputDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockInsertBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Pair[] { Pair.of(4, new Values(4, "abcde", "y")) }, TestUtils.MockInsertBolt.getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testUdf() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT MYPLUS(ID, 3)" +
+ "FROM FOO " +
+ "WHERE ID = 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ // inject output bolt
+ proc.outputStream().to(new TestUtils.MockBolt());
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(5) }, TestUtils.MockBolt.getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testNested() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+ "FROM FOO " +
+ "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
+
+ final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsNestedDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ // inject output bolt
+ proc.outputStream().to(new TestUtils.MockBolt());
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+ Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+ Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+ Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))},
+ TestUtils.MockBolt.getCollectedValues().toArray());
+ }
+
+ /**
+ * All the binary literal tests are done here, because Avatica converts the result to byte[]
+ * whereas Stream provides the result to ByteString which makes different semantic from Stream implementation.
+ */
+ @Test
+ public void testBinaryStringFunctions() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT x'45F0AB' || x'45F0AB', " +
+ "POSITION(x'F0' IN x'453423F0ABBC'), " +
+ "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
+ "SUBSTRING(x'453423F0ABBC' FROM 3), " +
+ "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
+ "FROM FOO " +
+ "WHERE ID > 0 AND ID < 2";
+
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ // inject output bolt
+ proc.outputStream().to(new TestUtils.MockBolt());
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+ Values v = TestUtils.MockBolt.getCollectedValues().get(0);
+
+ assertEquals("45f0ab45f0ab", v.get(0).toString());
+ assertEquals(4, v.get(1));
+ assertEquals("45344534abbc45", v.get(2).toString());
+ assertEquals("23f0abbc", v.get(3).toString());
+ assertEquals("23f0abbc", v.get(4).toString());
+ }
+
+ /**
+ * All the date/time/timestamp related tests are done here, because Avatica converts the result of date functions to java.sql classes
+ * whereas Stream provides long type which makes different semantic from Stream implementation.
+ */
+ @Test
+ public void testDateKeywordsAndFunctions() throws Exception {
+ int EXPECTED_VALUE_SIZE = 1;
+ String sql = "SELECT " +
+ "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " +
+ "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
+ "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
+ "FLOOR(DATE '2016-01-23' TO MONTH)," +
+ "CEIL(TIME '12:34:56' TO MINUTE)," +
+ "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," +
+ "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," +
+ "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," +
+ "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
+ "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field " +
+ "FROM FOO " +
+ "WHERE ID > 0 AND ID < 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+
+ final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
+
+ QueryPlanner planner = new QueryPlanner(state.schema());
+ AbstractStreamsProcessor proc = planner.compile(data, sql);
+ // inject output bolt
+ proc.outputStream().to(new TestUtils.MockBolt());
+ final DataContext dataContext = proc.getDataContext();
+ final StormTopology topo = proc.build();
+
+ SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
+
+ long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
+ long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
+ long localTimestamp = (long) dataContext.get(DataContext.Variable.LOCAL_TIMESTAMP.camelName);
+
+ System.out.println(TestUtils.MockBolt.getCollectedValues());
+
+ java.sql.Timestamp timestamp = new java.sql.Timestamp(utcTimestamp);
+ int dateInt = (int) timestamp.toLocalDateTime().atOffset(ZoneOffset.UTC).toLocalDate().toEpochDay();
+ int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+ int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
+
+ Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt,
+ 134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+ TestUtils.MockBolt.getCollectedValues().toArray());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
deleted file mode 100644
index 0f20d4d..0000000
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestCompilerUtils.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.compiler.backends.trident;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.compiler.CompilerUtil;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-
-public class TestCompilerUtils {
-
- public static CalciteState sqlOverDummyTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("NAME", typeFactory.createType(String.class))
- .field("ADDR", typeFactory.createType(String.class))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverDummyGroupByTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("GRPID", SqlTypeName.INTEGER)
- .field("NAME", typeFactory.createType(String.class))
- .field("ADDR", typeFactory.createType(String.class))
- .field("AGE", SqlTypeName.INTEGER)
- .field("SCORE", SqlTypeName.INTEGER)
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYSTATICSUM", AggregateFunctionImpl.create(MyStaticSumFunction.class));
- schema.add("MYSUM", AggregateFunctionImpl.create(MySumFunction.class));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverNestedTable(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
-
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("ID", SqlTypeName.INTEGER)
- .field("MAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- .field("NESTEDMAPFIELD",
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createMapType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true))
- , true))
- , true))
- .field("ARRAYFIELD", typeFactory.createTypeWithNullability(
- typeFactory.createArrayType(
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.INTEGER), true), -1L)
- , true))
- .build();
- Table table = streamableTable.stream();
- schema.add("FOO", table);
- schema.add("BAR", table);
- schema.add("MYPLUS", ScalarFunctionImpl.create(MyPlus.class, "eval"));
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static CalciteState sqlOverSimpleEquiJoinTables(String sql)
- throws RelConversionException, ValidationException, SqlParseException {
- SchemaPlus schema = Frameworks.createRootSchema(true);
- JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
- (RelDataTypeSystem.DEFAULT);
-
- StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("EMPID", SqlTypeName.INTEGER)
- .field("EMPNAME", SqlTypeName.VARCHAR)
- .field("DEPTID", SqlTypeName.INTEGER)
- .build();
- Table table = streamableTable.stream();
-
- StreamableTable streamableTable2 = new CompilerUtil.TableBuilderInfo(typeFactory)
- .field("DEPTID", SqlTypeName.INTEGER)
- .field("DEPTNAME", SqlTypeName.VARCHAR)
- .build();
- Table table2 = streamableTable2.stream();
-
- schema.add("EMP", table);
- schema.add("DEPT", table2);
-
- QueryPlanner queryPlanner = new QueryPlanner(schema);
- TridentRel tree = queryPlanner.getPlan(sql);
- System.out.println(StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES));
- return new CalciteState(schema, tree);
- }
-
- public static class MyPlus {
- public static Integer eval(Integer x, Integer y) {
- return x + y;
- }
- }
-
- public static class MyStaticSumFunction {
- public static long init() {
- return 0L;
- }
-
- public static long add(long accumulator, int v) {
- return accumulator + v;
- }
- }
-
- public static class MySumFunction {
- public MySumFunction() {
- }
-
- public long init() {
- return 0L;
- }
-
- public long add(long accumulator, int v) {
- return accumulator + v;
- }
-
- public long result(long accumulator) {
- return accumulator;
- }
- }
-
- public static class CalciteState {
- final SchemaPlus schema;
- final RelNode tree;
-
- private CalciteState(SchemaPlus schema, RelNode tree) {
- this.schema = schema;
- this.tree = tree;
- }
-
- public SchemaPlus schema() {
- return schema;
- }
-
- public RelNode tree() {
- return tree;
- }
- }
-}