You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/16 22:00:47 UTC
[5/7] storm git commit: STORM-2406 [Storm SQL] Change underlying API
to Streams API
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
new file mode 100644
index 0000000..180232e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/QueryPlanner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.sql.AbstractStreamsProcessor;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.planner.StormRelDataTypeSystem;
+import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsRel;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.tuple.Values;
+
+public class QueryPlanner {
+
+ public static final int STORM_REL_CONVERSION_RULES = 1;
+
+ private final Planner planner;
+
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ public QueryPlanner(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ Collections.emptyList(), typeFactory, new CalciteConnectionConfigImpl(new Properties())));
+
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .defaultSchema(schema)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(StreamsStormRuleSets.getRuleSets())
+ .costFactory(null)
+ .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
+ .build();
+ this.planner = Frameworks.getPlanner(config);
+ }
+
+ public AbstractStreamsProcessor compile(Map<String, ISqlStreamsDataSource> sources, String query) throws Exception {
+ StreamsRel relNode = getPlan(query);
+
+ StreamsPlanCreator streamsPlanCreator = new StreamsPlanCreator(sources, new RexBuilder(typeFactory));
+ relNode.streamsPlan(streamsPlanCreator);
+
+ final StreamBuilder streamBuilder = streamsPlanCreator.getStreamBuilder();
+ final Stream<Values> lastStream = streamsPlanCreator.pop();
+ final DataContext dc = streamsPlanCreator.getDataContext();
+ final List<CompilingClassLoader> cls = streamsPlanCreator.getClassLoaders();
+
+ return new AbstractStreamsProcessor() {
+ @Override
+ public StormTopology build() {
+ return streamBuilder.build();
+ }
+
+ @Override
+ public Stream<Values> outputStream() {
+ return lastStream;
+ }
+
+ @Override
+ public DataContext getDataContext() {
+ return dc;
+ }
+
+ @Override
+ public List<CompilingClassLoader> getClassLoaders() {
+ return cls;
+ }
+ };
+ }
+
+ public StreamsRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
+ return (StreamsRel) validateAndConvert(planner.parse(query));
+ }
+
+ private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
+ SqlNode validated = validateNode(sqlNode);
+ RelNode relNode = convertToRelNode(validated);
+ return convertToStormRel(relNode);
+ }
+
+ private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
+ RelTraitSet traitSet = relNode.getTraitSet();
+ traitSet = traitSet.simplify();
+
+ // PlannerImpl.transform() optimizes RelNode with ruleset
+ return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(StreamsLogicalConvention.INSTANCE), relNode);
+ }
+
+ private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+ return planner.rel(sqlNode).rel;
+ }
+
+ private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+ SqlNode validatedSqlNode = planner.validate(sqlNode);
+ validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
+ return validatedSqlNode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
new file mode 100644
index 0000000..977c1ee
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsPlanCreator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.tuple.Values;
+
+public class StreamsPlanCreator {
+ private final Map<String, ISqlStreamsDataSource> sources;
+ private final JavaTypeFactory typeFactory;
+ private final RexNodeToJavaCodeCompiler rexCompiler;
+ private final StreamBuilder streamBuilder;
+ private final DataContext dataContext;
+
+ private final Deque<Stream<Values>> streamStack = new ArrayDeque<>();
+ private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
+
+ public StreamsPlanCreator(Map<String, ISqlStreamsDataSource> sources, RexBuilder rexBuilder) {
+ this.sources = sources;
+ this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
+ this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
+
+ this.streamBuilder = new StreamBuilder();
+ this.dataContext = new StormDataContext();
+ }
+
+ public void addStream(Stream<Values> stream) throws Exception {
+ push(stream);
+ }
+
+ public Stream<Values> pop() {
+ return streamStack.pop();
+ }
+
+ public Map<String, ISqlStreamsDataSource> getSources() {
+ return sources;
+ }
+
+ public DataContext getDataContext() {
+ return dataContext;
+ }
+
+ public JavaTypeFactory getTypeFactory() {
+ return typeFactory;
+ }
+
+ public StreamBuilder getStreamBuilder() {
+ return streamBuilder;
+ }
+
+ public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
+ throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+ String expr = rexCompiler.compile(nodes, inputRowType, className);
+ CompilingClassLoader classLoader = new CompilingClassLoader(
+ getLastClassLoader(), className, expr, null);
+ ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+ addClassLoader(classLoader);
+ return new DebuggableExecutableExpression(instance, expr);
+ }
+
+ public ExecutableExpression createScalarInstance(RexProgram program, String className)
+ throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+ String expr = rexCompiler.compile(program, className);
+ CompilingClassLoader classLoader = new CompilingClassLoader(
+ getLastClassLoader(), className, expr, null);
+ ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+ addClassLoader(classLoader);
+ return new DebuggableExecutableExpression(instance, expr);
+ }
+
+ private void push(Stream<Values> stream) {
+ streamStack.push(stream);
+ }
+
+ public void addClassLoader(CompilingClassLoader compilingClassLoader) {
+ this.classLoaders.add(compilingClassLoader);
+ }
+
+ public ClassLoader getLastClassLoader() {
+ if (this.classLoaders.size() > 0) {
+ return this.classLoaders.get(this.classLoaders.size() - 1);
+ } else {
+ return this.getClass().getClassLoader();
+ }
+ }
+
+ public List<CompilingClassLoader> getClassLoaders() {
+ return classLoaders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
new file mode 100644
index 0000000..95ddc00
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams;
+
+import java.util.Iterator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionEliminatorRule;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.storm.sql.planner.streams.rules.StreamsAggregateRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsCalcRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsFilterRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsJoinRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsModifyRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsProjectRule;
+import org.apache.storm.sql.planner.streams.rules.StreamsScanRule;
+
+public class StreamsStormRuleSets {
+ private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
+ ImmutableSet.<RelOptRule>builder().add(
+ SortRemoveRule.INSTANCE,
+
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ CalcMergeRule.INSTANCE,
+
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
+
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectRemoveRule.INSTANCE,
+
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+
+ // merge and push unions rules
+ UnionEliminatorRule.INSTANCE,
+
+ StreamsScanRule.INSTANCE,
+ StreamsFilterRule.INSTANCE,
+ StreamsProjectRule.INSTANCE,
+ StreamsAggregateRule.INSTANCE,
+ StreamsJoinRule.INSTANCE,
+ StreamsModifyRule.INSTANCE,
+ StreamsCalcRule.INSTANCE
+ ).build();
+
+ public static RuleSet[] getRuleSets() {
+ return new RuleSet[]{
+ new StormRuleSet(StreamRules.RULES),
+ new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
+ };
+ }
+
+ private static class StormRuleSet implements RuleSet {
+ final ImmutableSet<RelOptRule> rules;
+
+ public StormRuleSet(ImmutableSet<RelOptRule> rules) {
+ this.rules = rules;
+ }
+
+ public StormRuleSet(ImmutableList<RelOptRule> rules) {
+ this.rules = ImmutableSet.<RelOptRule>builder()
+ .addAll(rules)
+ .build();
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
new file mode 100644
index 0000000..f4c2c7d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsCalcRel.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormCalcRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationCalc;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsCalcRel extends StormCalcRelBase implements StreamsRel {
+ public StreamsCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+ super(cluster, traits, child, program);
+ }
+
+ @Override
+ public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
+ return new StreamsCalcRel(getCluster(), traitSet, child, program);
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
+
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ List<String> outputFieldNames = getRowType().getFieldNames();
+ int outputCount = outputFieldNames.size();
+
+ // filter
+ ExecutableExpression filterInstance = null;
+ RexLocalRef condition = program.getCondition();
+ if (condition != null) {
+ RexNode conditionNode = program.expandLocalRef(condition);
+ filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
+ StormRelUtils.getClassName(this));
+ }
+
+ // projection
+ ExecutableExpression projectionInstance = null;
+ List<RexLocalRef> projectList = program.getProjectList();
+ if (projectList != null && !projectList.isEmpty()) {
+ List<RexNode> expandedNodes = new ArrayList<>();
+ for (RexLocalRef project : projectList) {
+ expandedNodes.add(program.expandLocalRef(project));
+ }
+
+ projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
+ StormRelUtils.getClassName(this));
+ }
+
+ if (projectionInstance == null && filterInstance == null) {
+ // it shouldn't be happen
+ throw new IllegalStateException("Either projection or condition, or both should be provided.");
+ }
+
+ EvaluationCalc evalCalc = new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext());
+ final Stream finalStream = inputStream.flatMap(evalCalc);
+
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
new file mode 100644
index 0000000..ce17bf6
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsFilterRel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormFilterRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationFilter;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsFilterRel extends StormFilterRelBase implements StreamsRel {
+ public StreamsFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new StreamsFilterRel(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
+
+ List<RexNode> childExps = getChildExps();
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ String filterClassName = StormRelUtils.getClassName(this);
+ ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
+
+ EvaluationFilter evalFilter = new EvaluationFilter(filterInstance, planCreator.getDataContext());
+ final Stream<Values> finalStream = inputStream.filter(evalFilter);
+
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
new file mode 100644
index 0000000..48a7540
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsLogicalConvention.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+public enum StreamsLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return StreamsRel.class;
+ }
+
+ @Override
+ public String getName() {
+ return "STORM_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
new file mode 100644
index 0000000..4ad70e1
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsProjectRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormProjectRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.streams.functions.EvaluationFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsProjectRel extends StormProjectRelBase implements StreamsRel {
+ public StreamsProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects,
+ RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+ return new StreamsProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
+
+ String projectionClassName = StormRelUtils.getClassName(this);
+
+ List<String> outputFieldNames = getRowType().getFieldNames();
+ int outputCount = outputFieldNames.size();
+
+ List<RexNode> childExps = getChildExps();
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
+ EvaluationFunction evalFunc = new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext());
+ final Stream<Values> finalStream = inputStream.map(evalFunc);
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
new file mode 100644
index 0000000..70d02ec
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsRel.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import org.apache.storm.sql.planner.rel.StormRelNode;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+
+public interface StreamsRel extends StormRelNode {
+ void streamsPlan(StreamsPlanCreator planCreator) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
new file mode 100644
index 0000000..f3b8994
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel {
+ private final int primaryKeyIndex;
+
+ public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader,
+ RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList,
+ boolean flattened, int primaryKeyIndex) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+ this.primaryKeyIndex = primaryKeyIndex;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
+ sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex);
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
+ Stream<Values> inputStream = planCreator.pop();
+
+ Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
+
+ // Calcite ensures that the value is structurized to the table definition
+ // hence we can use PK index directly
+ // To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER
+ // and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed,
+ // Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT.
+
+ String tableName = Joiner.on('.').join(getTable().getQualifiedName());
+ IRichBolt consumer = planCreator.getSources().get(tableName).getConsumer();
+
+ // To make logic simple, it assumes that all the tables have one PK (which it should be extended to support composed key),
+ // and provides PairStream(KeyedStream) to consumer bolt.
+ inputStream.mapToPair(new StreamInsertMapToPairFunction(primaryKeyIndex)).to(consumer);
+
+ planCreator.addStream(inputStream);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
new file mode 100644
index 0000000..563ea23
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamScanRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rel;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
+import org.apache.storm.sql.planner.streams.StreamsPlanCreator;
+import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
+import org.apache.storm.sql.runtime.streams.functions.StreamsScanTupleValueMapper;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.tuple.Values;
+
+public class StreamsStreamScanRel extends StormStreamScanRelBase implements StreamsRel {
+ private final int parallelismHint;
+
+ public StreamsStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
+ super(cluster, traitSet, table);
+ this.parallelismHint = parallelismHint;
+ }
+
+ @Override
+ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ Map<String, ISqlStreamsDataSource> sources = planCreator.getSources();
+ if (!sources.containsKey(sourceName)) {
+ throw new RuntimeException("Cannot find table " + sourceName);
+ }
+
+ List<String> fieldNames = getRowType().getFieldNames();
+ final Stream<Values> finalStream = planCreator.getStreamBuilder()
+ .newStream(sources.get(sourceName).getProducer(), new StreamsScanTupleValueMapper(fieldNames), parallelismHint);
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
new file mode 100644
index 0000000..3d86f26
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsAggregateRule.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsAggregateRule extends ConverterRule {
+ public static final RelOptRule INSTANCE = new StreamsAggregateRule();
+
+ private StreamsAggregateRule() {
+ super(LogicalAggregate.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsAggregateRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ throw new UnsupportedOperationException("Aggregate operation is not supported.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
new file mode 100644
index 0000000..328d4f5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsCalcRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.storm.sql.planner.streams.rel.StreamsCalcRel;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsCalcRule extends ConverterRule {
+ public static final StreamsCalcRule INSTANCE = new StreamsCalcRule();
+
+ private StreamsCalcRule() {
+ super(LogicalCalc.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsCalcRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Calc calc = (Calc) rel;
+ final RelNode input = calc.getInput();
+
+ return new StreamsCalcRel(calc.getCluster(), calc.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)),
+ calc.getProgram());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
new file mode 100644
index 0000000..1e5ab19
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsFilterRule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.storm.sql.planner.streams.rel.StreamsFilterRel;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsFilterRule extends ConverterRule {
+ public static StreamsFilterRule INSTANCE = new StreamsFilterRule();
+
+ private StreamsFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new StreamsFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
new file mode 100644
index 0000000..e3d5a7d
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsJoinRule.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+
+public class StreamsJoinRule extends ConverterRule {
+ public static final StreamsJoinRule INSTANCE = new StreamsJoinRule();
+
+ private StreamsJoinRule() {
+ super(LogicalJoin.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsJoinRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ throw new UnsupportedOperationException("Join operation is not supported.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
new file mode 100644
index 0000000..f477607
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsModifyRule.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.StormStreamableTable;
+import org.apache.storm.sql.calcite.StormTable;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsStreamInsertRel;
+
+public class StreamsModifyRule extends ConverterRule {
+ public static final StreamsModifyRule INSTANCE = new StreamsModifyRule();
+
+ private StreamsModifyRule() {
+ super(LogicalTableModify.class, Convention.NONE, StreamsLogicalConvention.INSTANCE, "StreamsModifyRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(StreamsLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ int primaryKey;
+
+ StormTable stormTable = tableModify.getTable().unwrap(StormTable.class);
+ if (stormTable != null) {
+ primaryKey = stormTable.primaryKey();
+ } else {
+ StormStreamableTable streamableTable = tableModify.getTable().unwrap(StormStreamableTable.class);
+ if (streamableTable != null) {
+ primaryKey = streamableTable.primaryKey();
+ } else {
+ throw new IllegalStateException("Table must be able to unwrap with StormTable or StormStreamableTable.");
+ }
+ }
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(String.format("Stream doesn't support %s modify operation", operation));
+ }
+ return new StreamsStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
+ updateColumnList, sourceExpressionList, flattened, primaryKey);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
new file mode 100644
index 0000000..7508046
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsProjectRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsProjectRel;
+
+public class StreamsProjectRule extends ConverterRule {
+ public static final StreamsProjectRule INSTANCE = new StreamsProjectRule();
+
+ private StreamsProjectRule() {
+ super(LogicalProject.class, Convention.NONE, StreamsLogicalConvention.INSTANCE,
+ "StreamsProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new StreamsProjectRel(project.getCluster(),
+ project.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(StreamsLogicalConvention.INSTANCE)), project.getProjects(),
+ project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
new file mode 100644
index 0000000..536fcfe
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rules/StreamsScanRule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.planner.streams.rules;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.calcite.ParallelTable;
+import org.apache.storm.sql.planner.streams.rel.StreamsLogicalConvention;
+import org.apache.storm.sql.planner.streams.rel.StreamsStreamScanRel;
+
+public class StreamsScanRule extends ConverterRule {
+ public static final StreamsScanRule INSTANCE = new StreamsScanRule();
+ public static final int DEFAULT_PARALLELISM_HINT = 1;
+
+ private StreamsScanRule() {
+ super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, StreamsLogicalConvention.INSTANCE, "StreamsScanRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+ int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+ final ParallelTable parallelTable = scan.getTable().unwrap(ParallelTable.class);
+ if (parallelTable != null && parallelTable.parallelismHint() != null) {
+ parallelismHint = parallelTable.parallelismHint();
+ }
+
+ final Table table = scan.getTable().unwrap(Table.class);
+ switch (table.getJdbcTableType()) {
+ case STREAM:
+ return new StreamsStreamScanRel(scan.getCluster(),
+ scan.getTraitSet().replace(StreamsLogicalConvention.INSTANCE),
+ scan.getTable(), parallelismHint);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
deleted file mode 100644
index 2238233..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.AbstractTridentProcessor;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.planner.StormRelDataTypeSystem;
-import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
-import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
-import org.apache.storm.sql.planner.trident.rel.TridentRel;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class QueryPlanner {
-
- public static final int STORM_REL_CONVERSION_RULES = 1;
-
- private final Planner planner;
-
- private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- public QueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
- false,
- Collections.<String>emptyList(), typeFactory));
-
- FrameworkConfig config = Frameworks.newConfigBuilder()
- .defaultSchema(schema)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .ruleSets(TridentStormRuleSets.getRuleSets())
- .costFactory(null)
- .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
- .build();
- this.planner = Frameworks.getPlanner(config);
- }
-
- public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
- TridentRel relNode = getPlan(query);
-
- TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
- relNode.tridentPlan(tridentPlanCreator);
-
- final TridentTopology topology = tridentPlanCreator.getTopology();
- final IAggregatableStream lastStream = tridentPlanCreator.pop();
- final DataContext dc = tridentPlanCreator.getDataContext();
- final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
-
- return new AbstractTridentProcessor() {
- @Override
- public TridentTopology build() {
- return topology;
- }
-
- @Override
- public Stream outputStream() {
- return lastStream.toStream();
- }
-
- @Override
- public DataContext getDataContext() {
- return dc;
- }
-
- @Override
- public List<CompilingClassLoader> getClassLoaders() {
- return cls;
- }
- };
- }
-
- public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
- return (TridentRel) validateAndConvert(planner.parse(query));
- }
-
- private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
- SqlNode validated = validateNode(sqlNode);
- RelNode relNode = convertToRelNode(validated);
- return convertToStormRel(relNode);
- }
-
- private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
- RelTraitSet traitSet = relNode.getTraitSet();
- traitSet = traitSet.simplify();
-
- // PlannerImpl.transform() optimizes RelNode with ruleset
- return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
- }
-
- private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
- return planner.rel(sqlNode).rel;
- }
-
- private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
- SqlNode validatedSqlNode = planner.validate(sqlNode);
- validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
- return validatedSqlNode;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
deleted file mode 100644
index 45277ca..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
-import org.apache.storm.sql.javac.CompilingClassLoader;
-import org.apache.storm.sql.runtime.ISqlTridentDataSource;
-import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.calcite.StormDataContext;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentPlanCreator {
- private final Map<String, ISqlTridentDataSource> sources;
- private final JavaTypeFactory typeFactory;
- private final RexNodeToJavaCodeCompiler rexCompiler;
- private final DataContext dataContext;
- private final TridentTopology topology;
-
- private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
- private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
-
- public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
- this.sources = sources;
- this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
- this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
-
- this.topology = new TridentTopology();
- this.dataContext = new StormDataContext();
- }
-
- public void addStream(IAggregatableStream stream) throws Exception {
- push(stream);
- }
-
- public IAggregatableStream pop() {
- return streamStack.pop();
- }
-
- public Map<String, ISqlTridentDataSource> getSources() {
- return sources;
- }
-
- public DataContext getDataContext() {
- return dataContext;
- }
-
- public JavaTypeFactory getTypeFactory() {
- return typeFactory;
- }
-
- public TridentTopology getTopology() {
- return topology;
- }
-
- public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
- throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
- String expr = rexCompiler.compile(nodes, inputRowType, className);
- CompilingClassLoader classLoader = new CompilingClassLoader(
- getLastClassLoader(), className, expr, null);
- ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
- addClassLoader(classLoader);
- return new DebuggableExecutableExpression(instance, expr);
- }
-
- public ExecutableExpression createScalarInstance(RexProgram program, String className)
- throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
- String expr = rexCompiler.compile(program, className);
- CompilingClassLoader classLoader = new CompilingClassLoader(
- getLastClassLoader(), className, expr, null);
- ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
- addClassLoader(classLoader);
- return new DebuggableExecutableExpression(instance, expr);
- }
-
- private void push(IAggregatableStream stream) {
- streamStack.push(stream);
- }
-
- public void addClassLoader(CompilingClassLoader compilingClassLoader) {
- this.classLoaders.add(compilingClassLoader);
- }
-
- public ClassLoader getLastClassLoader() {
- if (this.classLoaders.size() > 0) {
- return this.classLoaders.get(this.classLoaders.size() - 1);
- } else {
- return this.getClass().getClassLoader();
- }
- }
-
- public List<CompilingClassLoader> getClassLoaders() {
- return classLoaders;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
deleted file mode 100644
index 0c988b6..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.storm.sql.planner.trident;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.FilterCalcMergeRule;
-import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
-import org.apache.calcite.rel.rules.FilterToCalcRule;
-import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
-import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
-import org.apache.calcite.rel.rules.ProjectRemoveRule;
-import org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
-import org.apache.calcite.rel.rules.ReduceExpressionsRule;
-import org.apache.calcite.rel.rules.SortRemoveRule;
-import org.apache.calcite.rel.rules.UnionEliminatorRule;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.tools.RuleSet;
-import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
-import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
-import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
-import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
-import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
-import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
-import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
-
-public class TridentStormRuleSets {
- private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
- ImmutableSet.<RelOptRule>builder().add(
- SortRemoveRule.INSTANCE,
-
- FilterToCalcRule.INSTANCE,
- ProjectToCalcRule.INSTANCE,
- FilterCalcMergeRule.INSTANCE,
- ProjectCalcMergeRule.INSTANCE,
- CalcMergeRule.INSTANCE,
-
- PruneEmptyRules.FILTER_INSTANCE,
- PruneEmptyRules.PROJECT_INSTANCE,
- PruneEmptyRules.UNION_INSTANCE,
-
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- ProjectRemoveRule.INSTANCE,
-
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
-
- // merge and push unions rules
- UnionEliminatorRule.INSTANCE,
-
- TridentScanRule.INSTANCE,
- TridentFilterRule.INSTANCE,
- TridentProjectRule.INSTANCE,
- TridentAggregateRule.INSTANCE,
- TridentJoinRule.INSTANCE,
- TridentModifyRule.INSTANCE,
- TridentCalcRule.INSTANCE
- ).build();
-
- public static RuleSet[] getRuleSets() {
- return new RuleSet[]{
- new StormRuleSet(StreamRules.RULES),
- new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
- };
- }
-
- private static class StormRuleSet implements RuleSet {
- final ImmutableSet<RelOptRule> rules;
-
- public StormRuleSet(ImmutableSet<RelOptRule> rules) {
- this.rules = rules;
- }
-
- public StormRuleSet(ImmutableList<RelOptRule> rules) {
- this.rules = ImmutableSet.<RelOptRule>builder()
- .addAll(rules)
- .build();
- }
-
- @Override
- public Iterator<RelOptRule> iterator() {
- return rules.iterator();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
deleted file mode 100644
index 04acd34..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLocalRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormCalcRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.tuple.Fields;
-
-public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
- public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
- super(cluster, traits, child, program);
- }
-
- @Override
- public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
- return new TridentCalcRel(getCluster(), traitSet, child, program);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- RelDataType inputRowType = getInput(0).getRowType();
-
- List<String> outputFieldNames = getRowType().getFieldNames();
- int outputCount = outputFieldNames.size();
-
- // filter
- ExecutableExpression filterInstance = null;
- RexLocalRef condition = program.getCondition();
- if (condition != null) {
- RexNode conditionNode = program.expandLocalRef(condition);
- filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
- StormRelUtils.getClassName(this));
- }
-
- // projection
- ExecutableExpression projectionInstance = null;
- List<RexLocalRef> projectList = program.getProjectList();
- if (projectList != null && !projectList.isEmpty()) {
- List<RexNode> expandedNodes = new ArrayList<>();
- for (RexLocalRef project : projectList) {
- expandedNodes.add(program.expandLocalRef(project));
- }
-
- projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
- StormRelUtils.getClassName(this));
- }
-
- if (projectionInstance == null && filterInstance == null) {
- // it shouldn't be happen
- throw new IllegalStateException("Either projection or condition, or both should be provided.");
- }
-
- final Stream finalStream = inputStream
- .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()),
- new Fields(outputFieldNames))
- .name(stageName);
-
- planCreator.addStream(finalStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c69a23cf/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
deleted file mode 100644
index ba3c5e2..0000000
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.sql.planner.trident.rel;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
-import org.apache.storm.sql.planner.StormRelUtils;
-import org.apache.storm.sql.planner.rel.StormFilterRelBase;
-import org.apache.storm.sql.planner.trident.TridentPlanCreator;
-import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
-import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.fluent.IAggregatableStream;
-
-public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
- public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override
- public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new TridentFilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override
- public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
- // SingleRel
- RelNode input = getInput();
- StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
- Stream inputStream = planCreator.pop().toStream();
-
- String stageName = StormRelUtils.getStageName(this);
-
- List<RexNode> childExps = getChildExps();
- RelDataType inputRowType = getInput(0).getRowType();
-
- String filterClassName = StormRelUtils.getClassName(this);
- ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
-
- IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
- .name(stageName);
- planCreator.addStream(finalStream);
- }
-}