You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/19 00:30:58 UTC
[2/3] incubator-gearpump git commit: [GEARPUMP-217] Add Gearpump rel,
rule and examples.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
new file mode 100644
index 0000000..1b168fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class GearJoinRel extends Join implements GearRelNode {
+ public GearJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traits, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override
+ public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+ RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ return new GearJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+ joinType);
+ }
+
+ private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+ // it's a CROSS JOIN because: condition == true
+ if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+ throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+ }
+
+ RexCall call = (RexCall) condition;
+ List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+ if ("AND".equals(call.getOperator().getName())) {
+ List<RexNode> operands = call.getOperands();
+ for (RexNode rexNode : operands) {
+ Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+ pairs.add(pair);
+ }
+ } else if ("=".equals(call.getOperator().getName())) {
+ pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+ } else {
+ throw new UnsupportedOperationException(
+ "Operator " + call.getOperator().getName() + " is not supported in join condition");
+ }
+
+ return pairs;
+ }
+
+ private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+ int leftRowColumnCount) {
+ List<RexNode> operands = oneCondition.getOperands();
+ final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+
+ final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+ final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+ return new Pair<>(leftIndex, rightIndex);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
new file mode 100644
index 0000000..ced38c3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.*;
+
+public enum GearLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return GearRelNode.class;
+ }
+
+ @Override
+ public String getName() {
+ return "GEAR_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
new file mode 100644
index 0000000..1ca972a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearMinusRel extends Minus implements GearRelNode {
+
+ private GearSetOperatorRelBase delegate;
+
+ public GearMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.MINUS, inputs, all);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new GearMinusRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
new file mode 100644
index 0000000..f09dc8c
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearProjectRel extends Project implements GearRelNode {
+
+ public GearProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+ RelDataType rowType) {
+ return new GearProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
new file mode 100644
index 0000000..042c767
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public interface GearRelNode extends RelNode {
+
+ JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
new file mode 100644
index 0000000..ee59753
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class GearSetOperatorRelBase {
+
+ public enum OpType implements Serializable {
+ UNION,
+ INTERSECT,
+ MINUS
+ }
+
+ private GearRelNode gearRelNode;
+ private List<RelNode> inputs;
+ private boolean all;
+ private OpType opType;
+
+ public GearSetOperatorRelBase(GearRelNode gearRelNode, OpType opType,
+ List<RelNode> inputs, boolean all) {
+ this.gearRelNode = gearRelNode;
+ this.opType = opType;
+ this.inputs = inputs;
+ this.all = all;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
new file mode 100644
index 0000000..f70481b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GearSortRel extends Sort implements GearRelNode {
+
+ private List<Integer> fieldIndices = new ArrayList<>();
+ private List<Boolean> orientation = new ArrayList<>();
+ private List<Boolean> nullsFirst = new ArrayList<>();
+
+ private int startIndex = 0;
+ private int count;
+
+ public GearSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation,
+ RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+
+ List<RexNode> fieldExps = getChildExps();
+ RelCollationImpl collationImpl = (RelCollationImpl) collation;
+ List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+ for (int i = 0; i < fieldExps.size(); i++) {
+ RexNode fieldExp = fieldExps.get(i);
+ RexInputRef inputRef = (RexInputRef) fieldExp;
+ fieldIndices.add(inputRef.getIndex());
+ orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
+
+ RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
+ if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+ rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
+ }
+ nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
+ }
+
+ if (fetch == null) {
+ throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
+ }
+
+ RexLiteral fetchLiteral = (RexLiteral) fetch;
+ count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+ if (offset != null) {
+ RexLiteral offsetLiteral = (RexLiteral) offset;
+ startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+ }
+ }
+
+ @Override
+ public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
+ RexNode offset, RexNode fetch) {
+ return new GearSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+ return a.compareTo(b);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
new file mode 100644
index 0000000..54a6bbb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class GearSqlRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(GearSqlRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(GearRelNode relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+ + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(GearRelNode relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+ + "_" + classSequence.getAndIncrement();
+ }
+
+ public static GearRelNode getGearRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (GearRelNode) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. "
+ + "Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... "
+ + "SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
new file mode 100644
index 0000000..431368d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearUnionRel extends Union implements GearRelNode {
+
+ private GearSetOperatorRelBase delegate;
+
+ public GearUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ this.delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.UNION, inputs, all);
+ }
+
+ public GearUnionRel(RelInput input) {
+ super(input);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new GearUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
new file mode 100644
index 0000000..6bd9403
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearValuesRel extends Values implements GearRelNode {
+
+ public GearValuesRel(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples,
+ RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
new file mode 100644
index 0000000..c1b1602
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.gearpump.sql.rel.GearAggregationRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.apache.gearpump.streaming.dsl.window.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+public class GearAggregationRule extends RelOptRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRule.class);
+ public static final GearAggregationRule INSTANCE =
+ new GearAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+ public GearAggregationRule(Class<? extends Aggregate> aggregateClass,
+ Class<? extends Project> projectClass,
+ RelBuilderFactory relBuilderFactory) {
+ super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null);
+ }
+
+ public GearAggregationRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ updateWindowTrigger(call, aggregate, project);
+ }
+
+ private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, Project project) {
+ ImmutableBitSet groupByFields = aggregate.getGroupSet();
+ List<RexNode> projectMapping = project.getProjects();
+
+ WindowFunction windowFn = new GlobalWindowFunction();
+ Trigger triggerFn;
+ int windowFieldIdx = -1;
+ Duration allowedLatence = Duration.ZERO;
+
+ for (int groupField : groupByFields.asList()) {
+ RexNode projNode = projectMapping.get(groupField);
+ if (projNode instanceof RexCall) {
+ SqlOperator op = ((RexCall) projNode).op;
+ ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+ String functionName = op.getName();
+ switch (functionName) {
+ case "TUMBLE":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) FixedWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "HOP":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) SlidingWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))), Duration.ofMillis(getWindowParameterAsMillis(parameters.get(2))));
+
+ if (parameters.size() == 4) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "SESSION":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) SessionWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ try {
+ GearAggregationRel gearRel = new GearAggregationRel(aggregate.getCluster(),
+ aggregate.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(aggregate.getInput(),
+ aggregate.getInput().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList());
+ gearRel.buildGearPipeline(GearConfiguration.app, null);
+ GearConfiguration.app.submit().waitUntilFinish();
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+
+ }
+
+ private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+ return null;
+ }
+
+ private long getWindowParameterAsMillis(RexNode parameterNode) {
+ if (parameterNode instanceof RexLiteral) {
+ return RexLiteral.intValue(parameterNode);
+ } else {
+ throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
new file mode 100644
index 0000000..4817ee8
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.gearpump.sql.rel.GearFilterRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearFilterRule extends ConverterRule {
+
+ public static final GearFilterRule INSTANCE = new GearFilterRule();
+
+ private GearFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, GearLogicalConvention.INSTANCE, "GearFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ GearFilterRel gearRel = new GearFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ return gearRel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
new file mode 100644
index 0000000..e81f948
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearFlatMapRel;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GearFlatMapRule extends ConverterRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRule.class);
+ public static final GearFlatMapRule INSTANCE = new GearFlatMapRule(Aggregate.class, Convention.NONE);
+
+ public GearFlatMapRule(Class<? extends Aggregate> aggregateClass, RelTrait projectIn) {
+ super(aggregateClass, projectIn, GearLogicalConvention.INSTANCE, "GearFlatMapRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ try {
+ GearFlatMapRel flatRel = new GearFlatMapRel();
+ flatRel.buildGearPipeline(GearConfiguration.app, null);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
new file mode 100644
index 0000000..ca525d6
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.gearpump.sql.rel.GearIOSinkRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSinkRule extends ConverterRule {
+
+ public static final GearIOSinkRule INSTANCE = new GearIOSinkRule();
+
+ private GearIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(GearLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(GearLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new GearIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
new file mode 100644
index 0000000..a725b1a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.gearpump.sql.rel.GearIOSourceRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSourceRule extends ConverterRule {
+
+ public static final GearIOSourceRule INSTANCE = new GearIOSourceRule();
+
+ private GearIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new GearIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(GearLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
new file mode 100644
index 0000000..eb149f3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.gearpump.sql.rel.GearIntersectRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+import java.util.List;
+
+public class GearIntersectRule extends ConverterRule {
+
+ public static final GearIntersectRule INSTANCE = new GearIntersectRule();
+
+ private GearIntersectRule() {
+ super(LogicalIntersect.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearIntersectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Intersect intersect = (Intersect) rel;
+ final List<RelNode> inputs = intersect.getInputs();
+ return new GearIntersectRel(
+ intersect.getCluster(),
+ intersect.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(inputs, GearLogicalConvention.INSTANCE),
+ intersect.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
new file mode 100644
index 0000000..e86db06
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.gearpump.sql.rel.GearJoinRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearJoinRule extends ConverterRule {
+
+ public static final GearJoinRule INSTANCE = new GearJoinRule();
+
+ private GearJoinRule() {
+ super(LogicalJoin.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearJoinRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ return new GearJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
new file mode 100644
index 0000000..103a29d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearMinusRel;
+
+import java.util.List;
+
+public class GearMinusRule extends ConverterRule {
+
+ public static final GearMinusRule INSTANCE = new GearMinusRule();
+
+ private GearMinusRule() {
+ super(LogicalMinus.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearMinusRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Minus minus = (Minus) rel;
+ final List<RelNode> inputs = minus.getInputs();
+ return new GearMinusRel(
+ minus.getCluster(),
+ minus.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(inputs, GearLogicalConvention.INSTANCE),
+ minus.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
new file mode 100644
index 0000000..6b09550
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearProjectRel;
+
+public class GearProjectRule extends ConverterRule {
+
+ public static final GearProjectRule INSTANCE = new GearProjectRule();
+
+ private GearProjectRule() {
+ super(LogicalProject.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new GearProjectRel(project.getCluster(),
+ project.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
new file mode 100644
index 0000000..0a0d9e4
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearSortRel;
+
+public class GearSortRule extends ConverterRule {
+
+ public static final GearSortRule INSTANCE = new GearSortRule();
+
+ private GearSortRule() {
+ super(LogicalSort.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearSortRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ final RelNode input = sort.getInput();
+ return new GearSortRel(
+ sort.getCluster(),
+ sort.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
new file mode 100644
index 0000000..7a17a46
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearUnionRel;
+
+public class GearUnionRule extends ConverterRule {
+
+ public static final GearUnionRule INSTANCE = new GearUnionRule();
+
+ private GearUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearUnionRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+
+ return new GearUnionRel(
+ union.getCluster(),
+ union.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(union.getInputs(), GearLogicalConvention.INSTANCE),
+ union.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
new file mode 100644
index 0000000..b04eec2
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearValuesRel;
+
+public class GearValuesRule extends ConverterRule {
+
+ public static final GearValuesRule INSTANCE = new GearValuesRule();
+
+ private GearValuesRule() {
+ super(LogicalValues.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearValuesRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Values values = (Values) rel;
+ return new GearValuesRel(
+ values.getCluster(),
+ values.getRowType(),
+ values.getTuples(),
+ values.getTraitSet().replace(GearLogicalConvention.INSTANCE)
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
new file mode 100644
index 0000000..7ecba21
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+public class SampleString {
+
+ public static JavaStream<String> WORDS;
+
+ public static class Stream {
+ public static final Message[] KV = {new Message("001", "This is a good start, bingo!! bingo!!")};
+
+ public static String getKV() {
+ return KV[0].WORD;
+ }
+ }
+
+ public static class Message {
+ public final String ID;
+ public final String WORD;
+
+ public Message(String ID, String WORD) {
+ this.ID = ID;
+ this.WORD = WORD;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
new file mode 100644
index 0000000..4aa20e0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+public class SampleTransactions {
+
+ public static class Transactions {
+
+ public final Order[] ORDERS = {
+ new Order("001", 3),
+ new Order("002", 5),
+ new Order("003", 8),
+ new Order("004", 15),
+ };
+
+ public final Product[] PRODUCTS = {
+ new Product("001", "Book"),
+ new Product("002", "Pen"),
+ new Product("003", "Pencil"),
+ new Product("004", "Ruler"),
+ };
+ }
+
+ public static class Order {
+ public final String ID;
+ public final int QUANTITY;
+
+ public Order(String ID, int QUANTITY) {
+ this.ID = ID;
+ this.QUANTITY = QUANTITY;
+ }
+ }
+
+ public static class Product {
+ public final String ID;
+ public final String NAME;
+
+ public Product(String ID, String NAME) {
+ this.ID = ID;
+ this.NAME = NAME;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
new file mode 100644
index 0000000..4ff9efd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.Map;
+
+public class TransactionsTableFactory implements TableFactory<Table> {
+
+ @Override
+ public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) {
+ final Object[][] rows = {
+ {100, "I001", "item1", 3},
+ {101, "I002", "item2", 5},
+ {102, "I003", "item3", 8},
+ {103, "I004", "item4", 33},
+ {104, "I005", "item5", 23}
+ };
+
+ return new TransactionsTable(ImmutableList.copyOf(rows));
+ }
+
+ public static class TransactionsTable implements ScannableTable {
+
+ protected final RelProtoDataType protoRowType = new RelProtoDataType() {
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("timeStamp", SqlTypeName.TIMESTAMP)
+ .add("id", SqlTypeName.VARCHAR, 10)
+ .add("item", SqlTypeName.VARCHAR, 50)
+ .add("quantity", SqlTypeName.INTEGER)
+ .build();
+ }
+ };
+
+ private final ImmutableList<Object[]> rows;
+
+ public TransactionsTable(ImmutableList<Object[]> rows) {
+ this.rows = rows;
+ }
+
+ public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(rows);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
new file mode 100644
index 0000000..a63036d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RuleSets;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CalciteFrameworkConfiguration {
+
+ public static FrameworkConfig getDefaultconfig(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.JAVA)
+ .build())
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(RuleSets.ofList())
+ .costFactory(null)
+ .typeSystem(RelDataTypeSystem.DEFAULT)
+ .build();
+
+ return frameworkConfiguration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
new file mode 100644
index 0000000..03b2a47
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+public class GearConfiguration {
+
+ private Config akkaConf;
+ private ClientContext context;
+ public static JavaStreamApp app;
+
+ public void defaultConfiguration() {
+ setAkkaConf(ClusterConfig.defaultConfig());
+ setContext(ClientContext.apply(akkaConf));
+ }
+
+ public void ConfigJavaStreamApp() {
+ app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+ }
+
+ public void setAkkaConf(Config akkaConf) {
+ this.akkaConf = akkaConf;
+ }
+
+ public void setContext(ClientContext context) {
+ this.context = context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
new file mode 100644
index 0000000..d3d723f
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.validator;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+public class CalciteSqlValidator extends SqlValidatorImpl {
+ public CalciteSqlValidator(SqlOperatorTable opTab,
+ CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory,
+ SqlConformance conformance) {
+ super(opTab, catalogReader, typeFactory, conformance);
+ }
+
+ @Override
+ protected RelDataType getLogicalSourceRowType(
+ RelDataType sourceRowType, SqlInsert insert) {
+ final RelDataType superType =
+ super.getLogicalSourceRowType(sourceRowType, insert);
+ return ((JavaTypeFactory) typeFactory).toSql(superType);
+ }
+
+ @Override
+ protected RelDataType getLogicalTargetRowType(
+ RelDataType targetRowType, SqlInsert insert) {
+ final RelDataType superType =
+ super.getLogicalTargetRowType(targetRowType, insert);
+ return ((JavaTypeFactory) typeFactory).toSql(superType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
index 8fae091..ee55aa8 100644
--- a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
+++ b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
@@ -13,9 +13,6 @@ import org.apache.calcite.linq4j.tree.Expression
import org.apache.calcite.linq4j.{Enumerator, Queryable}
import org.apache.log4j.Logger
-/**
- * Created by Buddhi on 6/8/2017.
- */
class Connection extends CalciteConnection {
import org.apache.calcite.schema.SchemaPlus