You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:08 UTC
[05/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
new file mode 100644
index 0000000..437877c
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelDataTypeSystem.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+public class StormRelDataTypeSystem extends RelDataTypeSystemImpl {
+ public static final RelDataTypeSystem STORM_REL_DATATYPE_SYSTEM = new StormRelDataTypeSystem();
+
+ @Override
+ public int getMaxNumericScale() {
+ return 38;
+ }
+
+ @Override
+ public int getMaxNumericPrecision() {
+ return 38;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
new file mode 100644
index 0000000..40bbacd
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/StormRelUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StormRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(StormRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(TridentRel relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(TridentRel relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
+ classSequence.getAndIncrement();
+ }
+
+ public static TridentRel getStormRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (TridentRel) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
new file mode 100644
index 0000000..258fe72
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/UnsupportedOperatorsVisitor.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner;
+
+import org.apache.calcite.sql.util.SqlShuttle;
+
+public class UnsupportedOperatorsVisitor extends SqlShuttle {
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
new file mode 100644
index 0000000..29ca08b
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormCalcRelBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rex.RexProgram;
+
+public abstract class StormCalcRelBase extends Calc implements StormRelNode {
+ protected StormCalcRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+ super(cluster, traits, child, program);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
new file mode 100644
index 0000000..fa2ac65
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormFilterRelBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+
+public abstract class StormFilterRelBase extends Filter implements StormRelNode {
+ protected StormFilterRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
new file mode 100644
index 0000000..d8e82c5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormJoinRelBase.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Set;
+
+public abstract class StormJoinRelBase extends Join implements StormRelNode {
+ protected StormJoinRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
new file mode 100644
index 0000000..fe32ba5
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormProjectRelBase.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+public abstract class StormProjectRelBase extends Project implements StormRelNode {
+ protected StormProjectRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
new file mode 100644
index 0000000..9327868
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormRelNode.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.rel.RelNode;
+
+public interface StormRelNode extends RelNode {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
new file mode 100644
index 0000000..59694fc
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamInsertRelBase.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+public abstract class StormStreamInsertRelBase extends TableModify implements StormRelNode {
+ protected StormStreamInsertRelBase(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
new file mode 100644
index 0000000..32f1ac2
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/rel/StormStreamScanRelBase.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+
+public abstract class StormStreamScanRelBase extends TableScan implements StormRelNode {
+
+ // FIXME: define Table class and table.unwrap() to get it
+
+ protected StormStreamScanRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ super(cluster, traitSet, table);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
new file mode 100644
index 0000000..f98fb02
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/QueryPlanner.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.planner.trident;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.planner.StormRelDataTypeSystem;
+import org.apache.storm.sql.planner.UnsupportedOperatorsVisitor;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentRel;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.AbstractTridentProcessor;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class QueryPlanner {
+
+ public static final int STORM_REL_CONVERSION_RULES = 1;
+
+ private final Planner planner;
+
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ public QueryPlanner(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
+ false,
+ Collections.<String>emptyList(), typeFactory));
+
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .defaultSchema(schema)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(TridentStormRuleSets.getRuleSets())
+ .costFactory(null)
+ .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
+ .build();
+ this.planner = Frameworks.getPlanner(config);
+ }
+
+ public AbstractTridentProcessor compile(Map<String, ISqlTridentDataSource> sources, String query) throws Exception {
+ TridentRel relNode = getPlan(query);
+
+ TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
+ relNode.tridentPlan(tridentPlanCreator);
+
+ final TridentTopology topology = tridentPlanCreator.getTopology();
+ final IAggregatableStream lastStream = tridentPlanCreator.pop();
+ final DataContext dc = tridentPlanCreator.getDataContext();
+ final List<CompilingClassLoader> cls = tridentPlanCreator.getClassLoaders();
+
+ return new AbstractTridentProcessor() {
+ @Override
+ public TridentTopology build() {
+ return topology;
+ }
+
+ @Override
+ public Stream outputStream() {
+ return lastStream.toStream();
+ }
+
+ @Override
+ public DataContext getDataContext() {
+ return dc;
+ }
+
+ @Override
+ public List<CompilingClassLoader> getClassLoaders() {
+ return cls;
+ }
+ };
+ }
+
+ public TridentRel getPlan(String query) throws ValidationException, RelConversionException, SqlParseException {
+ return (TridentRel) validateAndConvert(planner.parse(query));
+ }
+
+ private RelNode validateAndConvert(SqlNode sqlNode) throws ValidationException, RelConversionException {
+ SqlNode validated = validateNode(sqlNode);
+ RelNode relNode = convertToRelNode(validated);
+ return convertToStormRel(relNode);
+ }
+
+ private RelNode convertToStormRel(RelNode relNode) throws RelConversionException {
+ RelTraitSet traitSet = relNode.getTraitSet();
+ traitSet = traitSet.simplify();
+
+ // PlannerImpl.transform() optimizes RelNode with ruleset
+ return planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);
+ }
+
+ private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+ return planner.rel(sqlNode).rel;
+ }
+
+ private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+ SqlNode validatedSqlNode = planner.validate(sqlNode);
+ validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
+ return validatedSqlNode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
new file mode 100644
index 0000000..30ebf7e
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentPlanCreator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.compiler.RexNodeToJavaCodeCompiler;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.calcite.DebuggableExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.calcite.StormDataContext;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+
+public class TridentPlanCreator {
+ private final Map<String, ISqlTridentDataSource> sources;
+ private final JavaTypeFactory typeFactory;
+ private final RexNodeToJavaCodeCompiler rexCompiler;
+ private final DataContext dataContext;
+ private final TridentTopology topology;
+
+ private final Deque<IAggregatableStream> streamStack = new ArrayDeque<>();
+ private final List<CompilingClassLoader> classLoaders = new ArrayList<>();
+
+ public TridentPlanCreator(Map<String, ISqlTridentDataSource> sources, RexBuilder rexBuilder) {
+ this.sources = sources;
+ this.rexCompiler = new RexNodeToJavaCodeCompiler(rexBuilder);
+ this.typeFactory = (JavaTypeFactory) rexBuilder.getTypeFactory();
+
+ this.topology = new TridentTopology();
+ this.dataContext = new StormDataContext();
+ }
+
+ public void addStream(IAggregatableStream stream) throws Exception {
+ push(stream);
+ }
+
+ public IAggregatableStream pop() {
+ return streamStack.pop();
+ }
+
+ public Map<String, ISqlTridentDataSource> getSources() {
+ return sources;
+ }
+
+ public DataContext getDataContext() {
+ return dataContext;
+ }
+
+ public JavaTypeFactory getTypeFactory() {
+ return typeFactory;
+ }
+
+ public TridentTopology getTopology() {
+ return topology;
+ }
+
+ public ExecutableExpression createScalarInstance(List<RexNode> nodes, RelDataType inputRowType, String className)
+ throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+ String expr = rexCompiler.compile(nodes, inputRowType, className);
+ CompilingClassLoader classLoader = new CompilingClassLoader(
+ getLastClassLoader(), className, expr, null);
+ ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+ addClassLoader(classLoader);
+ return new DebuggableExecutableExpression(instance, expr);
+ }
+
+ public ExecutableExpression createScalarInstance(RexProgram program, String className)
+ throws CompilingClassLoader.CompilerException, ClassNotFoundException, IllegalAccessException, InstantiationException {
+ String expr = rexCompiler.compile(program, className);
+ CompilingClassLoader classLoader = new CompilingClassLoader(
+ getLastClassLoader(), className, expr, null);
+ ExecutableExpression instance = (ExecutableExpression) classLoader.loadClass(className).newInstance();
+ addClassLoader(classLoader);
+ return new DebuggableExecutableExpression(instance, expr);
+ }
+
+ private void push(IAggregatableStream stream) {
+ streamStack.push(stream);
+ }
+
+ public void addClassLoader(CompilingClassLoader compilingClassLoader) {
+ this.classLoaders.add(compilingClassLoader);
+ }
+
+ public ClassLoader getLastClassLoader() {
+ if (this.classLoaders.size() > 0) {
+ return this.classLoaders.get(this.classLoaders.size() - 1);
+ } else {
+ return this.getClass().getClassLoader();
+ }
+ }
+
+ public List<CompilingClassLoader> getClassLoaders() {
+ return classLoaders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
new file mode 100644
index 0000000..e146069
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/TridentStormRuleSets.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.planner.trident;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionEliminatorRule;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.storm.sql.planner.trident.rules.TridentCalcRule;
+import org.apache.storm.sql.planner.trident.rules.TridentFilterRule;
+import org.apache.storm.sql.planner.trident.rules.TridentScanRule;
+import org.apache.storm.sql.planner.trident.rules.TridentAggregateRule;
+import org.apache.storm.sql.planner.trident.rules.TridentJoinRule;
+import org.apache.storm.sql.planner.trident.rules.TridentModifyRule;
+import org.apache.storm.sql.planner.trident.rules.TridentProjectRule;
+
+import java.util.Iterator;
+
+public class TridentStormRuleSets {
+ private static final ImmutableSet<RelOptRule> calciteToStormConversionRules =
+ ImmutableSet.<RelOptRule>builder().add(
+ SortRemoveRule.INSTANCE,
+
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ CalcMergeRule.INSTANCE,
+
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
+
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectRemoveRule.INSTANCE,
+
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+
+ // merge and push unions rules
+ UnionEliminatorRule.INSTANCE,
+
+ TridentScanRule.INSTANCE,
+ TridentFilterRule.INSTANCE,
+ TridentProjectRule.INSTANCE,
+ TridentAggregateRule.INSTANCE,
+ TridentJoinRule.INSTANCE,
+ TridentModifyRule.INSTANCE,
+ TridentCalcRule.INSTANCE
+ ).build();
+
+ public static RuleSet[] getRuleSets() {
+ return new RuleSet[]{
+ new StormRuleSet(StreamRules.RULES),
+ new StormRuleSet(ImmutableSet.<RelOptRule>builder().addAll(StreamRules.RULES).addAll(calciteToStormConversionRules).build())
+ };
+ }
+
+ private static class StormRuleSet implements RuleSet {
+ final ImmutableSet<RelOptRule> rules;
+
+ public StormRuleSet(ImmutableSet<RelOptRule> rules) {
+ this.rules = rules;
+ }
+
+ public StormRuleSet(ImmutableList<RelOptRule> rules) {
+ this.rules = ImmutableSet.<RelOptRule>builder()
+ .addAll(rules)
+ .build();
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
new file mode 100644
index 0000000..482e841
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentCalcRel.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormCalcRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationCalc;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TridentCalcRel extends StormCalcRelBase implements TridentRel {
+ public TridentCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexProgram program) {
+ super(cluster, traits, child, program);
+ }
+
+ @Override
+ public Calc copy(RelTraitSet traitSet, RelNode child, RexProgram program) {
+ return new TridentCalcRel(getCluster(), traitSet, child, program);
+ }
+
+ @Override
+ public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+ Stream inputStream = planCreator.pop().toStream();
+
+ String stageName = StormRelUtils.getStageName(this);
+
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ List<String> outputFieldNames = getRowType().getFieldNames();
+ int outputCount = outputFieldNames.size();
+
+ // filter
+ ExecutableExpression filterInstance = null;
+ RexLocalRef condition = program.getCondition();
+ if (condition != null) {
+ RexNode conditionNode = program.expandLocalRef(condition);
+ filterInstance = planCreator.createScalarInstance(Lists.newArrayList(conditionNode), inputRowType,
+ StormRelUtils.getClassName(this));
+ }
+
+ // projection
+ ExecutableExpression projectionInstance = null;
+ List<RexLocalRef> projectList = program.getProjectList();
+ if (projectList != null && !projectList.isEmpty()) {
+ List<RexNode> expandedNodes = new ArrayList<>();
+ for (RexLocalRef project : projectList) {
+ expandedNodes.add(program.expandLocalRef(project));
+ }
+
+ projectionInstance = planCreator.createScalarInstance(expandedNodes, inputRowType,
+ StormRelUtils.getClassName(this));
+ }
+
+ if (projectionInstance == null && filterInstance == null) {
+ // it shouldn't be happen
+ throw new IllegalStateException("Either projection or condition, or both should be provided.");
+ }
+
+ final Stream finalStream = inputStream
+ .flatMap(new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+ .name(stageName);
+
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
new file mode 100644
index 0000000..1fe0927
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentFilterRel.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormFilterRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFilter;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.List;
+
+public class TridentFilterRel extends StormFilterRelBase implements TridentRel {
+ public TridentFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new TridentFilterRel(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+ Stream inputStream = planCreator.pop().toStream();
+
+ String stageName = StormRelUtils.getStageName(this);
+
+ List<RexNode> childExps = getChildExps();
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ String filterClassName = StormRelUtils.getClassName(this);
+ ExecutableExpression filterInstance = planCreator.createScalarInstance(childExps, inputRowType, filterClassName);
+
+ IAggregatableStream finalStream = inputStream.filter(new EvaluationFilter(filterInstance, planCreator.getDataContext()))
+ .name(stageName);
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
new file mode 100644
index 0000000..d221498
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentLogicalConvention.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+public enum TridentLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return TridentRel.class;
+ }
+
+ @Override
+ public String getName() {
+ return "STORM_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
new file mode 100644
index 0000000..06be5d7
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentProjectRel.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormProjectRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.calcite.ExecutableExpression;
+import org.apache.storm.sql.runtime.trident.functions.EvaluationFunction;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public class TridentProjectRel extends StormProjectRelBase implements TridentRel {
+ public TridentProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+ return new TridentProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+ Stream inputStream = planCreator.pop().toStream();
+
+ String stageName = StormRelUtils.getStageName(this);
+ String projectionClassName = StormRelUtils.getClassName(this);
+
+ List<String> outputFieldNames = getRowType().getFieldNames();
+ int outputCount = outputFieldNames.size();
+
+ List<RexNode> childExps = getChildExps();
+ RelDataType inputRowType = getInput(0).getRowType();
+
+ ExecutableExpression projectionInstance = planCreator.createScalarInstance(childExps, inputRowType, projectionClassName);
+ Stream finalStream = inputStream
+ .map(new EvaluationFunction(projectionInstance, outputCount, planCreator.getDataContext()), new Fields(outputFieldNames))
+ .name(stageName);
+
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
new file mode 100644
index 0000000..8b8e949
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentRel.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import org.apache.storm.sql.planner.rel.StormRelNode;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+
+public interface TridentRel extends StormRelNode {
+ void tridentPlan(TridentPlanCreator planCreator) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
new file mode 100644
index 0000000..e92c29b
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamInsertRel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public class TridentStreamInsertRel extends StormStreamInsertRelBase implements TridentRel {
+ public TridentStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, RelNode child, Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new TridentStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(),
+ sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ @Override
+ public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+ // SingleRel
+ RelNode input = getInput();
+ StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
+ Stream inputStream = planCreator.pop().toStream();
+
+ String stageName = StormRelUtils.getStageName(this);
+
+ Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported.");
+
+ List<String> inputFields = this.input.getRowType().getFieldNames();
+ List<String> outputFields = getRowType().getFieldNames();
+
+ // FIXME: this should be really different...
+ String tableName = Joiner.on('.').join(getTable().getQualifiedName());
+ ISqlTridentDataSource.SqlTridentConsumer consumer = planCreator.getSources().get(tableName).getConsumer();
+
+ // In fact this is normally the end of stream, but I'm still not sure so I open new streams based on State values
+ IAggregatableStream finalStream = inputStream
+ .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(),
+ new Fields(outputFields))
+ .newValuesStream().name(stageName);
+
+ planCreator.addStream(finalStream);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
new file mode 100644
index 0000000..c563d73
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rel;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.rel.StormStreamScanRelBase;
+import org.apache.storm.sql.planner.trident.TridentPlanCreator;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.trident.fluent.IAggregatableStream;
+
+import java.util.Map;
+
+public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
+ private final int parallelismHint;
+
+ public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, int parallelismHint) {
+ super(cluster, traitSet, table);
+ this.parallelismHint = parallelismHint;
+ }
+
+ @Override
+ public void tridentPlan(TridentPlanCreator planCreator) throws Exception {
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ // FIXME: this should be really different...
+ Map<String, ISqlTridentDataSource> sources = planCreator.getSources();
+ if (!sources.containsKey(sourceName)) {
+ throw new RuntimeException("Cannot find table " + sourceName);
+ }
+
+ String stageName = StormRelUtils.getStageName(this);
+ IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName, sources.get(sourceName).getProducer())
+ .parallelismHint(parallelismHint);
+ planCreator.addStream(finalStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
new file mode 100644
index 0000000..ac35414
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentAggregateRule.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentAggregateRule extends ConverterRule {
+ public static final RelOptRule INSTANCE = new TridentAggregateRule();
+
+ private TridentAggregateRule() {
+ super(LogicalAggregate.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentAggregateRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ throw new UnsupportedOperationException("Aggregate operation is not supported.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
new file mode 100644
index 0000000..25126ec
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentCalcRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.storm.sql.planner.trident.rel.TridentCalcRel;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentCalcRule extends ConverterRule {
+ public static final TridentCalcRule INSTANCE = new TridentCalcRule();
+
+ private TridentCalcRule() {
+ super(LogicalCalc.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentCalcRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Calc calc = (Calc) rel;
+ final RelNode input = calc.getInput();
+
+ return new TridentCalcRel(calc.getCluster(), calc.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
+ calc.getProgram());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
new file mode 100644
index 0000000..7f9c41f
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentFilterRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.storm.sql.planner.trident.rel.TridentFilterRel;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentFilterRule extends ConverterRule {
+ public static TridentFilterRule INSTANCE = new TridentFilterRule();
+
+ private TridentFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new TridentFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
new file mode 100644
index 0000000..90f5083
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentJoinRule.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+
+public class TridentJoinRule extends ConverterRule {
+ public static final TridentJoinRule INSTANCE = new TridentJoinRule();
+
+ private TridentJoinRule() {
+ super(LogicalJoin.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentJoinRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ throw new UnsupportedOperationException("Join operation is not supported.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
new file mode 100644
index 0000000..2155451
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentModifyRule.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentStreamInsertRel;
+
+import java.util.List;
+
+public class TridentModifyRule extends ConverterRule {
+ public static final TridentModifyRule INSTANCE = new TridentModifyRule();
+
+ private TridentModifyRule() {
+ super(LogicalTableModify.class, Convention.NONE, TridentLogicalConvention.INSTANCE, "TridentModifyRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(TridentLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new TridentStreamInsertRel(cluster, traitSet, relOptTable, catalogReader, convertedInput, operation,
+ updateColumnList, sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
new file mode 100644
index 0000000..2922725
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentProjectRule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentProjectRel;
+
+public class TridentProjectRule extends ConverterRule {
+ public static final TridentProjectRule INSTANCE = new TridentProjectRule();
+
+ private TridentProjectRule() {
+ super(LogicalProject.class, Convention.NONE, TridentLogicalConvention.INSTANCE,
+ "TridentProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new TridentProjectRel(project.getCluster(),
+ project.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(TridentLogicalConvention.INSTANCE)), project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
new file mode 100644
index 0000000..abbd680
--- /dev/null
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.planner.trident.rules;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
+import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
+import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
+
+public class TridentScanRule extends ConverterRule {
+ public static final TridentScanRule INSTANCE = new TridentScanRule();
+ public static final int DEFAULT_PARALLELISM_HINT = 1;
+
+ private TridentScanRule() {
+ super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+ int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+ final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
+ if (parallelTable != null && parallelTable.parallelismHint() != null) {
+ parallelismHint = parallelTable.parallelismHint();
+ }
+
+ final Table table = scan.getTable().unwrap(Table.class);
+ switch (table.getJdbcTableType()) {
+ case STREAM:
+ return new TridentStreamScanRel(scan.getCluster(),
+ scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
+ scan.getTable(), parallelismHint);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}