You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/30 14:02:05 UTC

[1/2] incubator-gearpump git commit: [GEARPUMP-217] Add SQL support

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master f75fb19c7 -> 995c8cc0c


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
new file mode 100644
index 0000000..ca525d6
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.gearpump.sql.rel.GearIOSinkRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSinkRule extends ConverterRule {
+
+  public static final GearIOSinkRule INSTANCE = new GearIOSinkRule();
+
+  private GearIOSinkRule() {
+    super(LogicalTableModify.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+      "GearIOSinkRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableModify tableModify = (TableModify) rel;
+    final RelNode input = tableModify.getInput();
+
+    final RelOptCluster cluster = tableModify.getCluster();
+    final RelTraitSet traitSet = tableModify.getTraitSet().replace(GearLogicalConvention.INSTANCE);
+    final RelOptTable relOptTable = tableModify.getTable();
+    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+    final RelNode convertedInput = convert(input,
+      input.getTraitSet().replace(GearLogicalConvention.INSTANCE));
+    final TableModify.Operation operation = tableModify.getOperation();
+    final List<String> updateColumnList = tableModify.getUpdateColumnList();
+    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+    final boolean flattened = tableModify.isFlattened();
+
+    final Table table = tableModify.getTable().unwrap(Table.class);
+
+    switch (table.getJdbcTableType()) {
+      case TABLE:
+      case STREAM:
+        if (operation != TableModify.Operation.INSERT) {
+          throw new UnsupportedOperationException(
+            String.format("Streams doesn't support %s modify operation", operation));
+        }
+        return new GearIOSinkRel(cluster, traitSet,
+          relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+          sourceExpressionList, flattened);
+      default:
+        throw new IllegalArgumentException(
+          String.format("Unsupported table type: %s", table.getJdbcTableType()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
new file mode 100644
index 0000000..a725b1a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.gearpump.sql.rel.GearIOSourceRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSourceRule extends ConverterRule {
+
+  public static final GearIOSourceRule INSTANCE = new GearIOSourceRule();
+
+  private GearIOSourceRule() {
+    super(LogicalTableScan.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+      "GearIOSourceRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableScan scan = (TableScan) rel;
+
+    return new GearIOSourceRel(scan.getCluster(),
+      scan.getTraitSet().replace(GearLogicalConvention.INSTANCE), scan.getTable());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
new file mode 100644
index 0000000..eb149f3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.gearpump.sql.rel.GearIntersectRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+import java.util.List;
+
+public class GearIntersectRule extends ConverterRule {
+
+  public static final GearIntersectRule INSTANCE = new GearIntersectRule();
+
+  private GearIntersectRule() {
+    super(LogicalIntersect.class, Convention.NONE,
+      GearLogicalConvention.INSTANCE, "GearIntersectRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Intersect intersect = (Intersect) rel;
+    final List<RelNode> inputs = intersect.getInputs();
+    return new GearIntersectRel(
+      intersect.getCluster(),
+      intersect.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convertList(inputs, GearLogicalConvention.INSTANCE),
+      intersect.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
new file mode 100644
index 0000000..e86db06
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.gearpump.sql.rel.GearJoinRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearJoinRule extends ConverterRule {
+
+  public static final GearJoinRule INSTANCE = new GearJoinRule();
+
+  private GearJoinRule() {
+    super(LogicalJoin.class, Convention.NONE,
+      GearLogicalConvention.INSTANCE, "GearJoinRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Join join = (Join) rel;
+    return new GearJoinRel(
+      join.getCluster(),
+      join.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convert(join.getLeft(),
+        join.getLeft().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+      convert(join.getRight(),
+        join.getRight().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+      join.getCondition(),
+      join.getVariablesSet(),
+      join.getJoinType()
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
new file mode 100644
index 0000000..103a29d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearMinusRel;
+
+import java.util.List;
+
+public class GearMinusRule extends ConverterRule {
+
+  public static final GearMinusRule INSTANCE = new GearMinusRule();
+
+  private GearMinusRule() {
+    super(LogicalMinus.class, Convention.NONE,
+      GearLogicalConvention.INSTANCE, "GearMinusRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Minus minus = (Minus) rel;
+    final List<RelNode> inputs = minus.getInputs();
+    return new GearMinusRel(
+      minus.getCluster(),
+      minus.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convertList(inputs, GearLogicalConvention.INSTANCE),
+      minus.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
new file mode 100644
index 0000000..6b09550
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearProjectRel;
+
+public class GearProjectRule extends ConverterRule {
+
+  public static final GearProjectRule INSTANCE = new GearProjectRule();
+
+  private GearProjectRule() {
+    super(LogicalProject.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+      "GearProjectRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Project project = (Project) rel;
+    final RelNode input = project.getInput();
+
+    return new GearProjectRel(project.getCluster(),
+      project.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+      project.getProjects(), project.getRowType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
new file mode 100644
index 0000000..0a0d9e4
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearSortRel;
+
+public class GearSortRule extends ConverterRule {
+
+  public static final GearSortRule INSTANCE = new GearSortRule();
+
+  private GearSortRule() {
+    super(LogicalSort.class, Convention.NONE,
+      GearLogicalConvention.INSTANCE, "GearSortRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Sort sort = (Sort) rel;
+    final RelNode input = sort.getInput();
+    return new GearSortRel(
+      sort.getCluster(),
+      sort.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+      sort.getCollation(),
+      sort.offset,
+      sort.fetch
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
new file mode 100644
index 0000000..7a17a46
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearUnionRel;
+
+public class GearUnionRule extends ConverterRule {
+
+  public static final GearUnionRule INSTANCE = new GearUnionRule();
+
+  private GearUnionRule() {
+    super(LogicalUnion.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+      "GearUnionRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Union union = (Union) rel;
+
+    return new GearUnionRel(
+      union.getCluster(),
+      union.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convertList(union.getInputs(), GearLogicalConvention.INSTANCE),
+      union.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
new file mode 100644
index 0000000..b04eec2
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearValuesRel;
+
+public class GearValuesRule extends ConverterRule {
+
+  public static final GearValuesRule INSTANCE = new GearValuesRule();
+
+  private GearValuesRule() {
+    super(LogicalValues.class, Convention.NONE,
+      GearLogicalConvention.INSTANCE, "GearValuesRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    Values values = (Values) rel;
+    return new GearValuesRel(
+      values.getCluster(),
+      values.getRowType(),
+      values.getTuples(),
+      values.getTraitSet().replace(GearLogicalConvention.INSTANCE)
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
new file mode 100644
index 0000000..7ecba21
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+public class SampleString {
+
+  public static JavaStream<String> WORDS;
+
+  public static class Stream {
+    public static final Message[] KV = {new Message("001", "This is a good start, bingo!! bingo!!")};
+
+    public static String getKV() {
+      return KV[0].WORD;
+    }
+  }
+
+  public static class Message {
+    public final String ID;
+    public final String WORD;
+
+    public Message(String ID, String WORD) {
+      this.ID = ID;
+      this.WORD = WORD;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
new file mode 100644
index 0000000..4aa20e0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+public class SampleTransactions {
+
+  public static class Transactions {
+
+    public final Order[] ORDERS = {
+      new Order("001", 3),
+      new Order("002", 5),
+      new Order("003", 8),
+      new Order("004", 15),
+    };
+
+    public final Product[] PRODUCTS = {
+      new Product("001", "Book"),
+      new Product("002", "Pen"),
+      new Product("003", "Pencil"),
+      new Product("004", "Ruler"),
+    };
+  }
+
+  public static class Order {
+    public final String ID;
+    public final int QUANTITY;
+
+    public Order(String ID, int QUANTITY) {
+      this.ID = ID;
+      this.QUANTITY = QUANTITY;
+    }
+  }
+
+  public static class Product {
+    public final String ID;
+    public final String NAME;
+
+    public Product(String ID, String NAME) {
+      this.ID = ID;
+      this.NAME = NAME;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
new file mode 100644
index 0000000..4ff9efd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.Map;
+
+public class TransactionsTableFactory implements TableFactory<Table> {
+
+  @Override
+  public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) {
+    final Object[][] rows = {
+      {100, "I001", "item1", 3},
+      {101, "I002", "item2", 5},
+      {102, "I003", "item3", 8},
+      {103, "I004", "item4", 33},
+      {104, "I005", "item5", 23}
+    };
+
+    return new TransactionsTable(ImmutableList.copyOf(rows));
+  }
+
+  public static class TransactionsTable implements ScannableTable {
+
+    protected final RelProtoDataType protoRowType = new RelProtoDataType() {
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+          .add("timeStamp", SqlTypeName.TIMESTAMP)
+          .add("id", SqlTypeName.VARCHAR, 10)
+          .add("item", SqlTypeName.VARCHAR, 50)
+          .add("quantity", SqlTypeName.INTEGER)
+          .build();
+      }
+    };
+
+    private final ImmutableList<Object[]> rows;
+
+    public TransactionsTable(ImmutableList<Object[]> rows) {
+      this.rows = rows;
+    }
+
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.UNKNOWN;
+    }
+
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
new file mode 100644
index 0000000..a63036d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RuleSets;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CalciteFrameworkConfiguration {
+
+  public static FrameworkConfig getDefaultconfig(SchemaPlus schema) {
+    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder()
+      .parserConfig(SqlParser.configBuilder()
+        .setLex(Lex.JAVA)
+        .build())
+      .defaultSchema(schema)
+      .traitDefs(traitDefs)
+      .context(Contexts.EMPTY_CONTEXT)
+      .ruleSets(RuleSets.ofList())
+      .costFactory(null)
+      .typeSystem(RelDataTypeSystem.DEFAULT)
+      .build();
+
+    return frameworkConfiguration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
new file mode 100644
index 0000000..03b2a47
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+public class GearConfiguration {
+
+  private Config akkaConf;
+  private ClientContext context;
+  public static JavaStreamApp app;
+
+  public void defaultConfiguration() {
+    setAkkaConf(ClusterConfig.defaultConfig());
+    setContext(ClientContext.apply(akkaConf));
+  }
+
+  public void ConfigJavaStreamApp() {
+    app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+  }
+
+  public void setAkkaConf(Config akkaConf) {
+    this.akkaConf = akkaConf;
+  }
+
+  public void setContext(ClientContext context) {
+    this.context = context;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
new file mode 100644
index 0000000..d3d723f
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.validator;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+public class CalciteSqlValidator extends SqlValidatorImpl {
+  public CalciteSqlValidator(SqlOperatorTable opTab,
+                             CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory,
+                             SqlConformance conformance) {
+    super(opTab, catalogReader, typeFactory, conformance);
+  }
+
+  @Override
+  protected RelDataType getLogicalSourceRowType(
+    RelDataType sourceRowType, SqlInsert insert) {
+    final RelDataType superType =
+      super.getLogicalSourceRowType(sourceRowType, insert);
+    return ((JavaTypeFactory) typeFactory).toSql(superType);
+  }
+
+  @Override
+  protected RelDataType getLogicalTargetRowType(
+    RelDataType targetRowType, SqlInsert insert) {
+    final RelDataType superType =
+      super.getLogicalTargetRowType(targetRowType, insert);
+    return ((JavaTypeFactory) typeFactory).toSql(superType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/resources/log4j.properties b/experiments/sql/src/main/resources/log4j.properties
new file mode 100644
index 0000000..98dcb80
--- /dev/null
+++ b/experiments/sql/src/main/resources/log4j.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# For the general syntax of property based configuration files see the
+# documenation of org.apache.log4j.PropertyConfigurator.
+# The root category uses the appender called A1. Since no priority is
+# specified, the root category assumes the default priority for root
+# which is DEBUG in log4j. The root category is the only category that
+# has a default priority. All other categories need not be assigned a
+# priority in which case they inherit their priority from the
+# hierarchy.
+#log4j.rootLogger=debug, console
+log4j.rootLogger=info, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+#log4j.appender.console.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
+log4j.appender.console.layout.ConversionPattern=[%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
new file mode 100644
index 0000000..64c5af7
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.example;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.*;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rule.GearAggregationRule;
+import org.apache.gearpump.sql.rule.GearFlatMapRule;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+public class SqlWordCountTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SqlWordCountTest.class);
+
+  private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) {
+    try {
+      return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs);
+    } catch (ClassNotFoundException e) {
+      LOG.error(e.getMessage());
+    } catch (SQLException e) {
+      LOG.error(e.getMessage());
+    }
+    return null;
+  }
+
+  private Planner getPlanner(List<RelTraitDef> traitDefs,
+                             SqlParser.Config parserConfig,
+                             Program... programs) throws ClassNotFoundException, SQLException {
+
+    Class.forName("org.apache.calcite.jdbc.Driver");
+    java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+    SchemaPlus rootSchema = calciteConnection.getRootSchema();
+    rootSchema.add("STR", new ReflectiveSchema(new SampleString.Stream()));
+
+    final FrameworkConfig config = Frameworks.newConfigBuilder()
+      .parserConfig(parserConfig)
+      .defaultSchema(rootSchema)
+      .traitDefs(traitDefs)
+      .programs(programs)
+      .build();
+    return Frameworks.getPlanner(config);
+  }
+
+  void wordCountTest(GearConfiguration gearConfig) throws SqlParseException,
+    ValidationException, RelConversionException {
+
+    RuleSet ruleSet = RuleSets.ofList(
+      GearFlatMapRule.INSTANCE,
+      GearAggregationRule.INSTANCE);
+
+    Planner planner = getPlanner(null, Programs.of(ruleSet));
+
+    String sql = "SELECT COUNT(*) FROM str.kv GROUP BY str.kv.word";
+    System.out.println("SQL Query:-\t" + sql + "\n");
+
+    SqlNode parse = planner.parse(sql);
+    System.out.println("SQL Parse Tree:- \n" + parse.toString() + "\n\n");
+
+    SqlNode validate = planner.validate(parse);
+    RelNode convert = planner.rel(validate).project();
+    System.out.println("Relational Expression:- \n" + RelOptUtil.toString(convert) + "\n");
+
+    gearConfig.defaultConfiguration();
+    gearConfig.ConfigJavaStreamApp();
+
+    RelTraitSet traitSet = convert.getTraitSet().replace(GearLogicalConvention.INSTANCE);
+    try {
+      RelNode transform = planner.transform(0, traitSet, convert);
+      System.out.println(RelOptUtil.toString(transform));
+    } catch (Exception e) {
+    }
+
+  }
+
+
+  public static void main(String[] args) throws ClassNotFoundException,
+    SQLException, SqlParseException {
+
+    SqlWordCountTest gearSqlWordCount = new SqlWordCountTest();
+
+    try {
+      GearConfiguration gearConfig = new GearConfiguration();
+      gearSqlWordCount.wordCountTest(gearConfig);
+    } catch (ValidationException e) {
+      LOG.error(e.getMessage());
+    } catch (RelConversionException e) {
+      LOG.error(e.getMessage());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
new file mode 100644
index 0000000..2f21531
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.RelOptTable.ViewExpander;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexExecutor;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.RelDecorrelator;
+import org.apache.calcite.sql2rel.SqlRexConvertletTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.tools.*;
+import org.apache.calcite.util.Util;
+import org.apache.gearpump.sql.rule.GearFilterRule;
+import org.apache.gearpump.sql.table.SampleTransactions;
+import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration;
+import org.apache.gearpump.sql.validator.CalciteSqlValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+public class CalciteTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CalciteTest.class);
+
+  private SqlOperatorTable operatorTable;
+  private FrameworkConfig config;
+  private ImmutableList<RelTraitDef> traitDefs;
+  private SqlParser.Config parserConfig;
+  private SqlRexConvertletTable convertletTable;
+  private State state;
+  private SchemaPlus defaultSchema;
+  private JavaTypeFactory typeFactory;
+  private RelOptPlanner planner;
+  private RexExecutor executor;
+  private RelRoot root;
+
+  public CalciteTest(FrameworkConfig config) {
+    this.config = config;
+    this.defaultSchema = config.getDefaultSchema();
+    this.operatorTable = config.getOperatorTable();
+    this.parserConfig = config.getParserConfig();
+    this.state = State.STATE_0_CLOSED;
+    this.traitDefs = config.getTraitDefs();
+    this.convertletTable = config.getConvertletTable();
+    this.executor = config.getExecutor();
+    reset();
+  }
+
+  public CalciteTest() {
+  }
+
+  private void ensure(State state) {
+    if (state == this.state) {
+      return;
+    }
+    if (state.ordinal() < this.state.ordinal()) {
+      throw new IllegalArgumentException("cannot move to " + state + " from "
+        + this.state);
+    }
+    state.from(this);
+  }
+
+  public void close() {
+    typeFactory = null;
+    state = State.STATE_0_CLOSED;
+  }
+
+  public void reset() {
+    ensure(State.STATE_0_CLOSED);
+    state = State.STATE_1_RESET;
+  }
+
+  private void ready() {
+    switch (state) {
+      case STATE_0_CLOSED:
+        reset();
+    }
+    ensure(State.STATE_1_RESET);
+    Frameworks.withPlanner(
+      new Frameworks.PlannerAction<Void>() {
+        public Void apply(RelOptCluster cluster, RelOptSchema relOptSchema,
+                          SchemaPlus rootSchema) {
+          Util.discard(rootSchema); // use our own defaultSchema
+          typeFactory = (JavaTypeFactory) cluster.getTypeFactory();
+          planner = cluster.getPlanner();
+          planner.setExecutor(executor);
+          return null;
+        }
+      },
+      config);
+
+    state = State.STATE_2_READY;
+
+    // If user specify own traitDef, instead of default default trait,
+    // first, clear the default trait def registered with planner
+    // then, register the trait def specified in traitDefs.
+    if (this.traitDefs != null) {
+      planner.clearRelTraitDefs();
+      for (RelTraitDef def : this.traitDefs) {
+        planner.addRelTraitDef(def);
+      }
+    }
+  }
+
+  private static SchemaPlus rootSchema(SchemaPlus schema) {
+    for (; ; ) {
+      if (schema.getParentSchema() == null) {
+        return schema;
+      }
+      schema = schema.getParentSchema();
+    }
+  }
+
+  private CalciteCatalogReader createCatalogReader() {
+    SchemaPlus rootSchema = rootSchema(defaultSchema);
+    return new CalciteCatalogReader(
+      CalciteSchema.from(rootSchema),
+      parserConfig.caseSensitive(),
+      CalciteSchema.from(defaultSchema).path(null),
+      typeFactory);
+  }
+
+  private RexBuilder createRexBuilder() {
+    return new RexBuilder(typeFactory);
+  }
+
+  private SqlConformance conformance() {
+    final Context context = config.getContext();
+    if (context != null) {
+      final CalciteConnectionConfig connectionConfig =
+        context.unwrap(CalciteConnectionConfig.class);
+      if (connectionConfig != null) {
+        return connectionConfig.conformance();
+      }
+    }
+    return SqlConformanceEnum.DEFAULT;
+  }
+
+  /**
+   * Implements {@link org.apache.calcite.plan.RelOptTable.ViewExpander}
+   * interface for {@link org.apache.calcite.tools.Planner}.
+   */
+  public class ViewExpanderImpl implements ViewExpander {
+    @Override
+    public RelRoot expandView(RelDataType rowType, String queryString,
+                              List<String> schemaPath, List<String> viewPath) {
+      SqlParser parser = SqlParser.create(queryString, parserConfig);
+      SqlNode sqlNode;
+      try {
+        sqlNode = parser.parseQuery();
+      } catch (SqlParseException e) {
+        throw new RuntimeException("parse failed", e);
+      }
+
+      final SqlConformance conformance = conformance();
+      final CalciteCatalogReader catalogReader =
+        createCatalogReader().withSchemaPath(schemaPath);
+      final SqlValidator validator =
+        new CalciteSqlValidator(operatorTable, catalogReader, typeFactory,
+          conformance);
+      validator.setIdentifierExpansion(true);
+      final SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+      final RexBuilder rexBuilder = createRexBuilder();
+      final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+      final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
+        .withTrimUnusedFields(false).withConvertTableAccess(false).build();
+      final SqlToRelConverter sqlToRelConverter =
+        new SqlToRelConverter(new ViewExpanderImpl(), validator,
+          catalogReader, cluster, convertletTable, config);
+
+      root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false);
+      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel));
+
+      return CalciteTest.this.root;
+    }
+  }
+
+  private enum State {
+    STATE_0_CLOSED {
+      @Override
+      void from(CalciteTest planner) {
+        planner.close();
+      }
+    },
+    STATE_1_RESET {
+      @Override
+      void from(CalciteTest planner) {
+        planner.ensure(STATE_0_CLOSED);
+        planner.reset();
+      }
+    },
+    STATE_2_READY {
+      @Override
+      void from(CalciteTest planner) {
+        STATE_1_RESET.from(planner);
+        planner.ready();
+      }
+    },
+    STATE_3_PARSED,
+    STATE_4_VALIDATED,
+    STATE_5_CONVERTED;
+
+    /**
+     * Moves planner's state to this state. This must be a higher state.
+     */
+    void from(CalciteTest planner) {
+      throw new IllegalArgumentException("cannot move from " + planner.state
+        + " to " + this);
+    }
+  }
+
+  void calTest() throws SqlParseException {
+
+//    String sql = "select t.orders.id from t.orders";
+//
+//    String sql = "select t.products.id "
+//      + "from t.orders, t.products "
+//      + "where t.orders.id = t.products.id and quantity>2 ";
+
+    String sql = "SELECT t.products.id AS product_id, t.products.name "
+      + "AS product_name, t.orders.id AS order_id "
+      + "FROM t.products JOIN t.orders ON t.products.id = t.orders.id  WHERE quantity > 2";
+
+    final SqlParser.Config parserConfig = SqlParser.configBuilder().setLex(Lex.MYSQL).build();
+
+    // Parse the query
+    SqlParser parser = SqlParser.create(sql, parserConfig);
+    SqlNode sqlNode = parser.parseStmt();
+
+    // Validate the query
+    CalciteCatalogReader catalogReader = createCatalogReader();
+    SqlValidator validator = SqlValidatorUtil.newValidator(
+      SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlConformance.DEFAULT);
+    SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+    // Convert SqlNode to RelNode
+    RexBuilder rexBuilder = createRexBuilder();
+    RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+    SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(
+      new ViewExpanderImpl(),
+      validator,
+      createCatalogReader(),
+      cluster,
+      convertletTable);
+    RelRoot root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
+    System.out.println(RelOptUtil.toString(root.rel));
+
+    // Optimize the plan
+    RelOptPlanner planner = new VolcanoPlanner();
+
+    // Create a set of rules to apply
+    Program program = Programs.ofRules(
+//      FilterProjectTransposeRule.INSTANCE,
+//      ProjectMergeRule.INSTANCE,
+//      FilterMergeRule.INSTANCE,
+//      FilterJoinRule.JOIN,
+      GearFilterRule.INSTANCE,
+      LoptOptimizeJoinRule.INSTANCE);
+
+    RelTraitSet traitSet = planner.emptyTraitSet().replace(EnumerableConvention.INSTANCE);
+
+    // Execute the program
+//    RelNode optimized = program.run(planner, root.rel, traitSet,
+//      ImmutableList.<RelOptMaterialization>of(), ImmutableList.<RelOptLattice>of());
+//    logger.info(RelOptUtil.toString(optimized));
+
+  }
+
+  // new test -------------------------
+  private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) {
+    try {
+      return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs);
+    } catch (ClassNotFoundException e) {
+      LOG.error(e.getMessage());
+    } catch (SQLException e) {
+      LOG.error(e.getMessage());
+    }
+    return null;
+  }
+
+  private Planner getPlanner(List<RelTraitDef> traitDefs,
+                             SqlParser.Config parserConfig,
+                             Program... programs) throws ClassNotFoundException, SQLException {
+
+    Class.forName("org.apache.calcite.jdbc.Driver");
+    java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+    SchemaPlus rootSchema = calciteConnection.getRootSchema();
+    rootSchema.add("T", new ReflectiveSchema(new SampleTransactions.Transactions()));
+
+    final FrameworkConfig config = Frameworks.newConfigBuilder()
+      .parserConfig(parserConfig)
+      .defaultSchema(rootSchema)
+      .traitDefs(traitDefs)
+      .programs(programs)
+      .build();
+    return Frameworks.getPlanner(config);
+  }
+
+  void calTest2() throws SqlParseException, ValidationException, RelConversionException {
+
+    RuleSet ruleSet = RuleSets.ofList(
+      SortRemoveRule.INSTANCE,
+      EnumerableRules.ENUMERABLE_PROJECT_RULE,
+      EnumerableRules.ENUMERABLE_SORT_RULE);
+
+    Planner planner = getPlanner(null, Programs.of(ruleSet));
+
+    String sql = "SELECT * FROM t.products ORDER BY t.products.id";
+
+    SqlNode parse = planner.parse(sql);
+    System.out.println("SQL Parse Tree:- \n" + parse.toString());
+
+    SqlNode validate = planner.validate(parse);
+    RelNode convert = planner.rel(validate).project();
+    RelTraitSet traitSet = convert.getTraitSet().replace(EnumerableConvention.INSTANCE);
+    RelNode transform = planner.transform(0, traitSet, convert);
+    System.out.println("\n\nRelational Expression:- \n" + RelOptUtil.toString(transform));
+
+  }
+
+
+  public static void main(String[] args) throws ClassNotFoundException,
+    SQLException, SqlParseException {
+
+    // calTest()
+//    Class.forName("org.apache.calcite.jdbc.Driver");
+//    java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+//    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+//    SchemaPlus rootSchema = calciteConnection.getRootSchema();
+//    rootSchema.add("t", new ReflectiveSchema(new StreamQueryPlanner.Transactions()));
+//
+//    FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
+//    CalciteTest ct = new CalciteTest(frameworkConfig);
+//    ct.ready();
+//    ct.calTest();
+
+    // calTest2()
+    CalciteTest calTest = new CalciteTest();
+    try {
+      calTest.calTest2();
+    } catch (ValidationException e) {
+      LOG.error(e.getMessage());
+    } catch (RelConversionException e) {
+      LOG.error(e.getMessage());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
new file mode 100644
index 0000000..551beff
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.io.Resources;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+public class QueryTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryTest.class);
+
+  @Test
+  public void testLogicalPlan() {
+
+    try {
+      CalciteConnection connection = new Connection();
+      String salesSchema = Resources.toString(Query.class.getResource("/model.json"),
+        Charset.defaultCharset());
+      new ModelHandler(connection, "inline:" + salesSchema);
+
+      Query queryPlanner = new Query(connection.getRootSchema().getSubSchema(connection.getSchema()));
+      RelNode logicalPlan = queryPlanner.getLogicalPlan("SELECT item FROM transactions");
+
+      System.out.println("Getting Logical Plan...\n" + RelOptUtil.toString(logicalPlan));
+
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+    } catch (RelConversionException e) {
+      LOG.error(e.getMessage());
+    } catch (ValidationException e) {
+      LOG.error(e.getMessage());
+    } catch (SQLException e) {
+      LOG.error(e.getMessage());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/resources/model.json
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/resources/model.json b/experiments/sql/src/test/resources/model.json
new file mode 100644
index 0000000..bfb62ed
--- /dev/null
+++ b/experiments/sql/src/test/resources/model.json
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * A JSON model of a simple Calcite schema.
+ */
+
+{
+  "version": "1.0",
+  "defaultSchema": "SALES",
+  "schemas": [
+    {
+      "name": "SALES",
+      "tables": [
+        {
+          "name": "Transactions",
+          "type": "custom",
+          "factory": "org.apache.gearpump.sql.table.TransactionsTableFactory",
+          "operand": {
+            "file": "resources/sales/Transactions.csv",
+            "flavor": "scannable"
+          }
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/test/resources/sales/Transactions.csv
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/resources/sales/Transactions.csv b/experiments/sql/src/test/resources/sales/Transactions.csv
new file mode 100644
index 0000000..f974d97
--- /dev/null
+++ b/experiments/sql/src/test/resources/sales/Transactions.csv
@@ -0,0 +1,6 @@
+timeStamp:int,id:string,item:string,quantity:int
+100,"I001","item1",3
+101,"I002","item2",5
+102,"I003","item3",8
+103,"I004","item4",33
+104,"I005","item5",23

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index 59c95f7..84c80f0 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -133,4 +133,16 @@ object BuildExperiments extends sbt.Build {
       ))
     .dependsOn(core % "provided", streaming % "test->test; provided")
     .disablePlugins(sbtassembly.AssemblyPlugin)
+
+  lazy val sql = Project(
+    id = "gearpump-experiments-sql",
+    base = file("experiments/sql"),
+    settings = commonSettings ++ noPublish ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.calcite" % "calcite-core" % calciteVersion
+        )
+      ))
+    .dependsOn(core % "provided", streaming % "test->test; provided")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 90b57d3..b146c08 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -54,6 +54,7 @@ object Dependencies {
   val chillVersion = "0.6.0"
   val jedisVersion = "2.9.0"
   val rabbitmqVersion = "3.5.3"
+  val calciteVersion = "1.12.0"
 
   val annotationDependencies = Seq(
     // work around for compiler warnings like


[2/2] incubator-gearpump git commit: [GEARPUMP-217] Add SQL support

Posted by ma...@apache.org.
[GEARPUMP-217] Add SQL support

Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Author: manuzhang <ow...@gmail.com>

Closes #221 from manuzhang/merge_sql_into_master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/995c8cc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/995c8cc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/995c8cc0

Branch: refs/heads/master
Commit: 995c8cc0cc0b18dd30cbbe218972b0bded297ff0
Parents: f75fb19
Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Authored: Wed Aug 30 22:00:41 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 30 22:01:00 2017 +0800

----------------------------------------------------------------------
 .../examples/wordcountjava/WordCountSpec.scala  |  13 +-
 experiments/sql/README.md                       |   8 +
 .../apache/gearpump/sql/planner/Connection.java | 292 ++++++++++++++
 .../sql/planner/GearRelDataTypeSystem.java      |  41 ++
 .../gearpump/sql/planner/GearRuleSets.java      |  60 +++
 .../gearpump/sql/planner/LogicalPlan.java       |  43 ++
 .../org/apache/gearpump/sql/planner/Query.java  |  80 ++++
 .../sql/planner/StreamQueryPlanner.java         |  96 +++++
 .../gearpump/sql/rel/GearAggregationRel.java    | 120 ++++++
 .../apache/gearpump/sql/rel/GearFilterRel.java  |  47 +++
 .../apache/gearpump/sql/rel/GearFlatMapRel.java | 112 ++++++
 .../apache/gearpump/sql/rel/GearIOSinkRel.java  |  52 +++
 .../gearpump/sql/rel/GearIOSourceRel.java       |  39 ++
 .../gearpump/sql/rel/GearIntersectRel.java      |  54 +++
 .../apache/gearpump/sql/rel/GearJoinRel.java    |  94 +++++
 .../gearpump/sql/rel/GearLogicalConvention.java |  65 +++
 .../apache/gearpump/sql/rel/GearMinusRel.java   |  51 +++
 .../apache/gearpump/sql/rel/GearProjectRel.java |  50 +++
 .../apache/gearpump/sql/rel/GearRelNode.java    |  30 ++
 .../sql/rel/GearSetOperatorRelBase.java         |  47 +++
 .../apache/gearpump/sql/rel/GearSortRel.java    |  95 +++++
 .../gearpump/sql/rel/GearSqlRelUtils.java       |  71 ++++
 .../apache/gearpump/sql/rel/GearUnionRel.java   |  55 +++
 .../apache/gearpump/sql/rel/GearValuesRel.java  |  42 ++
 .../gearpump/sql/rule/GearAggregationRule.java  | 147 +++++++
 .../gearpump/sql/rule/GearFilterRule.java       |  48 +++
 .../gearpump/sql/rule/GearFlatMapRule.java      |  52 +++
 .../gearpump/sql/rule/GearIOSinkRule.java       |  79 ++++
 .../gearpump/sql/rule/GearIOSourceRule.java     |  46 +++
 .../gearpump/sql/rule/GearIntersectRule.java    |  51 +++
 .../apache/gearpump/sql/rule/GearJoinRule.java  |  53 +++
 .../apache/gearpump/sql/rule/GearMinusRule.java |  51 +++
 .../gearpump/sql/rule/GearProjectRule.java      |  48 +++
 .../apache/gearpump/sql/rule/GearSortRule.java  |  51 +++
 .../apache/gearpump/sql/rule/GearUnionRule.java |  49 +++
 .../gearpump/sql/rule/GearValuesRule.java       |  48 +++
 .../apache/gearpump/sql/table/SampleString.java |  45 +++
 .../gearpump/sql/table/SampleTransactions.java  |  60 +++
 .../sql/table/TransactionsTableFactory.java     |  88 ++++
 .../utils/CalciteFrameworkConfiguration.java    |  58 +++
 .../gearpump/sql/utils/GearConfiguration.java   |  49 +++
 .../sql/validator/CalciteSqlValidator.java      |  50 +++
 .../sql/src/main/resources/log4j.properties     |  28 ++
 .../gearpump/sql/example/SqlWordCountTest.java  | 125 ++++++
 .../gearpump/sql/planner/CalciteTest.java       | 397 +++++++++++++++++++
 .../apache/gearpump/sql/planner/QueryTest.java  |  65 +++
 experiments/sql/src/test/resources/model.json   |  39 ++
 .../src/test/resources/sales/Transactions.csv   |   6 +
 project/BuildExperiments.scala                  |  12 +
 project/Dependencies.scala                      |   1 +
 50 files changed, 3396 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
index 3736c86..7df8651 100644
--- a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -18,16 +18,15 @@
 
 package org.apache.gearpump.streaming.examples.wordcountjava
 
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
 import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
 import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.streaming.examples.wordcountjava.WordCount
+import org.apache.gearpump.streaming.examples.wordcountjava.dsl.WordCount
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import scala.concurrent.Future
+import scala.util.Success
 
 class WordCountSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/README.md
----------------------------------------------------------------------
diff --git a/experiments/sql/README.md b/experiments/sql/README.md
new file mode 100644
index 0000000..28b6d2c
--- /dev/null
+++ b/experiments/sql/README.md
@@ -0,0 +1,8 @@
+# SQL Support
+This project is about building a SQL layer with [Apache Calcite](https://calcite.apache.org/) to help those who are unfamiliar with Scala/Java to use [Apache Gearpump](http://gearpump.apache.org/).
+
+## Build
+- Build [GearPump SQL](/experiments/sql)
+
+## Example
+- Run [SQL WordCount example](/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
new file mode 100644
index 0000000..e5954f0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class Connection implements CalciteConnection {
+
+  private final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
+  private String schema = null;
+
+  public SchemaPlus getRootSchema() {
+    return rootSchema;
+  }
+
+  public JavaTypeFactory getTypeFactory() {
+    return null;
+  }
+
+  public Properties getProperties() {
+    return null;
+  }
+
+  public Statement createStatement() throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return null;
+  }
+
+  public String nativeSQL(String sql) throws SQLException {
+    return null;
+  }
+
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+  }
+
+  public boolean getAutoCommit() throws SQLException {
+    return false;
+  }
+
+  public void commit() throws SQLException {
+
+  }
+
+  public void rollback() throws SQLException {
+
+  }
+
+  public void close() throws SQLException {
+
+  }
+
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  public void setReadOnly(boolean readOnly) throws SQLException {
+
+  }
+
+  public boolean isReadOnly() throws SQLException {
+    return false;
+  }
+
+  public void setCatalog(String catalog) throws SQLException {
+
+  }
+
+  public String getCatalog() throws SQLException {
+    return null;
+  }
+
+  public void setTransactionIsolation(int level) throws SQLException {
+
+  }
+
+  public int getTransactionIsolation() throws SQLException {
+    return 0;
+  }
+
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  public void clearWarnings() throws SQLException {
+
+  }
+
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return null;
+  }
+
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+  }
+
+  public void setHoldability(int holdability) throws SQLException {
+
+  }
+
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  public Savepoint setSavepoint() throws SQLException {
+    return null;
+  }
+
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return null;
+  }
+
+  public void rollback(Savepoint savepoint) throws SQLException {
+
+  }
+
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+  }
+
+  public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return null;
+  }
+
+  public Clob createClob() throws SQLException {
+    return null;
+  }
+
+  public Blob createBlob() throws SQLException {
+    return null;
+  }
+
+  public NClob createNClob() throws SQLException {
+    return null;
+  }
+
+  public SQLXML createSQLXML() throws SQLException {
+    return null;
+  }
+
+  public boolean isValid(int timeout) throws SQLException {
+    return false;
+  }
+
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+  }
+
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+  }
+
+  public String getClientInfo(String name) throws SQLException {
+    return null;
+  }
+
+  public Properties getClientInfo() throws SQLException {
+    return null;
+  }
+
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return null;
+  }
+
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return null;
+  }
+
+  public void setSchema(String s) throws SQLException {
+    schema = s;
+  }
+
+  public String getSchema() throws SQLException {
+    return schema;
+  }
+
+  public void abort(Executor executor) throws SQLException {
+
+  }
+
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+  }
+
+  public int getNetworkTimeout() throws SQLException {
+    return 0;
+  }
+
+  public CalciteConnectionConfig config() {
+    return null;
+  }
+
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+
+  public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
+    return null;
+  }
+
+  public <T> Queryable<T> createQuery(Expression expression, Type type) {
+    return null;
+  }
+
+  public <T> T execute(Expression expression, Class<T> aClass) {
+    return null;
+  }
+
+  public <T> T execute(Expression expression, Type type) {
+    return null;
+  }
+
+  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
new file mode 100644
index 0000000..a640e12
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data types.
+ */
+public class GearRelDataTypeSystem extends RelDataTypeSystemImpl {
+
+  public static final RelDataTypeSystem GEAR_REL_DATATYPE_SYSTEM = new GearRelDataTypeSystem();
+
+  @Override
+  public int getMaxNumericScale() {
+    return 38;
+  }
+
+  @Override
+  public int getMaxNumericPrecision() {
+    return 38;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
new file mode 100644
index 0000000..a962ff1
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.gearpump.sql.rule.*;
+
+import java.util.Iterator;
+
+public class GearRuleSets {
+  private static final ImmutableSet<RelOptRule> calciteToGearConversionRules = ImmutableSet
+    .<RelOptRule>builder().add(GearIOSourceRule.INSTANCE, GearProjectRule.INSTANCE,
+      GearFilterRule.INSTANCE, GearIOSinkRule.INSTANCE,
+      GearAggregationRule.INSTANCE, GearSortRule.INSTANCE, GearValuesRule.INSTANCE,
+      GearIntersectRule.INSTANCE, GearMinusRule.INSTANCE, GearUnionRule.INSTANCE,
+      GearJoinRule.INSTANCE)
+    .build();
+
+  public static RuleSet[] getRuleSets() {
+    return new RuleSet[]{new GearRuleSet(
+      ImmutableSet.<RelOptRule>builder().addAll(calciteToGearConversionRules).build())};
+  }
+
+  private static class GearRuleSet implements RuleSet {
+    final ImmutableSet<RelOptRule> rules;
+
+    public GearRuleSet(ImmutableSet<RelOptRule> rules) {
+      this.rules = rules;
+    }
+
+    public GearRuleSet(ImmutableList<RelOptRule> rules) {
+      this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+    }
+
+    @Override
+    public Iterator<RelOptRule> iterator() {
+      return rules.iterator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
new file mode 100644
index 0000000..1448a71
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+public class LogicalPlan {
+
+  public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException,
+    RelConversionException {
+    SqlNode sqlNode;
+
+    try {
+      sqlNode = planner.parse(query);
+    } catch (SqlParseException e) {
+      throw new RuntimeException("SQL query parsing error", e);
+    }
+    SqlNode validatedSqlNode = planner.validate(sqlNode);
+
+    return planner.rel(validatedSqlNode).project();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
new file mode 100644
index 0000000..c18b8b5
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This Class is intended to test functions of Apache Calcite
+ */
+public class Query {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Query.class);
+  private final Planner queryPlanner;
+
+  public Query(SchemaPlus schema) {
+
+    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder()
+      .parserConfig(SqlParser.configBuilder()
+        .setLex(Lex.MYSQL)
+        .build())
+      .defaultSchema(schema)
+      .traitDefs(traitDefs)
+      .context(Contexts.EMPTY_CONTEXT)
+      .ruleSets(RuleSets.ofList())
+      .costFactory(null)
+      .typeSystem(RelDataTypeSystem.DEFAULT)
+      .build();
+    this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig);
+  }
+
+  public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException {
+    SqlNode sqlNode = null;
+    try {
+      sqlNode = queryPlanner.parse(query);
+    } catch (SqlParseException e) {
+      LOG.error(e.getMessage());
+    }
+
+    SqlNode validatedSqlNode = queryPlanner.validate(sqlNode);
+    return queryPlanner.rel(validatedSqlNode).project();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
new file mode 100644
index 0000000..10f9cbd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.*;
+import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class StreamQueryPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamQueryPlanner.class);
+
+  public static void main(String[] args) throws ClassNotFoundException,
+    SQLException, ValidationException, RelConversionException {
+
+    Class.forName("org.apache.calcite.jdbc.Driver");
+    Connection connection = DriverManager.getConnection("jdbc:calcite:");
+    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+    SchemaPlus rootSchema = calciteConnection.getRootSchema();
+    rootSchema.add("t", new ReflectiveSchema(new Transactions()));
+
+    FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
+    Planner planner = Frameworks.getPlanner(frameworkConfig);
+
+    String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products "
+      + "where t.orders.id = t.products.id group by t.orders.id, name "
+      + "having sum(quantity) > 5 order by sum(quantity) ";
+
+    RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner);
+    LOG.info("Relational Expression:- \n\n" + RelOptUtil.toString(logicalPlan));
+
+  }
+
+  public static class Transactions {
+
+    public final Order[] orders = {
+      new Order("001", 3),
+      new Order("002", 5),
+      new Order("003", 8),
+      new Order("004", 15),
+    };
+
+    public final Product[] products = {
+      new Product("001", "Book"),
+      new Product("002", "Pen"),
+      new Product("003", "Pencil"),
+      new Product("004", "Ruler"),
+    };
+  }
+
+  public static class Order {
+    public final String id;
+    public final int quantity;
+
+    public Order(String id, int quantity) {
+      this.id = id;
+      this.quantity = quantity;
+    }
+  }
+
+  public static class Product {
+    public final String id;
+    public final String name;
+
+    public Product(String id, String name) {
+      this.id = id;
+      this.name = name;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
new file mode 100644
index 0000000..46dc15b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import org.apache.gearpump.streaming.dsl.window.api.Trigger;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Duration;
+import java.util.List;
+
+public class GearAggregationRel extends Aggregate implements GearRelNode {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRel.class);
+  private int windowFieldIdx = -1;
+  private WindowFunction windowFn;
+  private Trigger trigger;
+  private Duration allowedLatence = Duration.ZERO;
+
+  public GearAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+                            boolean indicator, ImmutableBitSet groupSet,
+                            List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator,
+                        ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+                        List<AggregateCall> aggCalls) {
+    return null;
+  }
+
+  public RelWriter explainTerms(RelWriter pw) {
+    pw.item("group", groupSet)
+      .itemIf("window", windowFn, windowFn != null)
+      .itemIf("trigger", trigger, trigger != null)
+      .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+      .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+      .itemIf("indicator", indicator, indicator)
+      .itemIf("aggs", aggCalls, pw.nest());
+    if (!pw.nest()) {
+      for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+        pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+      }
+    }
+    return pw;
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+                                                               JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    LOG.debug("Adding Map");
+    JavaStream<Tuple2<String, Integer>> ones = SampleString.WORDS.map(new Ones(), "map");
+
+    LOG.debug("Adding GroupBy");
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(),
+      1, "groupBy");
+//        groupedOnes.log();
+    LOG.debug("Adding Reduce");
+    JavaStream<Tuple2<String, Integer>> wordCount = groupedOnes.reduce(new Count(), "reduce");
+    wordCount.log();
+
+    return wordCount;
+  }
+
+  private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+    @Override
+    public Tuple2<String, Integer> map(String s) {
+      return new Tuple2<>(s, 1);
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+    @Override
+    public String groupBy(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+    @Override
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
new file mode 100644
index 0000000..53a07d9
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearFilterRel extends Filter implements GearRelNode {
+
+  public GearFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+                       RexNode condition) {
+    super(cluster, traits, child, condition);
+  }
+
+  @Override
+  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new GearFilterRel(getCluster(), traitSet, input, condition);
+  }
+
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
new file mode 100644
index 0000000..d4c55fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.DefaultMessage;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class GearFlatMapRel extends Filter implements GearRelNode {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRel.class);
+
+  public GearFlatMapRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+    super(cluster, traits, child, condition);
+  }
+
+  public GearFlatMapRel() {
+    super(null, null, null, null);
+  }
+
+  @Override
+  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new GearFlatMapRel(getCluster(), traitSet, input, condition);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+                                                               JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    LOG.debug("Adding Source");
+    JavaStream<String> sentence = app.source(new StringSource(SampleString.Stream.getKV()),
+      1, UserConfig.empty(), "source");
+    LOG.debug("Adding flatMap");
+    SampleString.WORDS = sentence.flatMap(new Split(), "flatMap");
+    return null;
+  }
+
+  private static class StringSource implements DataSource {
+    private final String str;
+    private boolean hasNext = true;
+
+    StringSource(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public void open(TaskContext context, Instant startTime) {
+    }
+
+    @Override
+    public Message read() {
+      Message msg = new DefaultMessage(str, Instant.now());
+      hasNext = false;
+      return msg;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (hasNext) {
+        return Instant.now();
+      } else {
+        return Watermark.MAX();
+      }
+    }
+  }
+
+  private static class Split extends FlatMapFunction<String, String> {
+    @Override
+    public Iterator<String> flatMap(String s) {
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
new file mode 100644
index 0000000..1d61baf
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIOSinkRel extends TableModify implements GearRelNode {
+  public GearIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+                       Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+                       List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+    super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+      sourceExpressionList, flattened);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new GearIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+      getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
new file mode 100644
index 0000000..6641c35
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearIOSourceRel extends TableScan implements GearRelNode {
+
+  public GearIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+    super(cluster, traitSet, table);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
new file mode 100644
index 0000000..6888a26
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIntersectRel extends Intersect implements GearRelNode {
+  private GearSetOperatorRelBase delegate;
+
+  public GearIntersectRel(
+    RelOptCluster cluster,
+    RelTraitSet traits,
+    List<RelNode> inputs,
+    boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new GearSetOperatorRelBase(this,
+      GearSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new GearIntersectRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
new file mode 100644
index 0000000..1b168fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class GearJoinRel extends Join implements GearRelNode {
+  public GearJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+                     RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override
+  public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+                   RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    return new GearJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+      joinType);
+  }
+
+  private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+    // it's a CROSS JOIN because: condition == true
+    if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+      throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+    }
+
+    RexCall call = (RexCall) condition;
+    List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+    if ("AND".equals(call.getOperator().getName())) {
+      List<RexNode> operands = call.getOperands();
+      for (RexNode rexNode : operands) {
+        Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+        pairs.add(pair);
+      }
+    } else if ("=".equals(call.getOperator().getName())) {
+      pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+    } else {
+      throw new UnsupportedOperationException(
+        "Operator " + call.getOperator().getName() + " is not supported in join condition");
+    }
+
+    return pairs;
+  }
+
+  private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+                                                      int leftRowColumnCount) {
+    List<RexNode> operands = oneCondition.getOperands();
+    final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+      ((RexInputRef) operands.get(1)).getIndex());
+
+    final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+      ((RexInputRef) operands.get(1)).getIndex());
+    final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+    return new Pair<>(leftIndex, rightIndex);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
new file mode 100644
index 0000000..ced38c3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.*;
+
+public enum GearLogicalConvention implements Convention {
+  INSTANCE;
+
+  @Override
+  public Class getInterface() {
+    return GearRelNode.class;
+  }
+
+  @Override
+  public String getName() {
+    return "GEAR_LOGICAL";
+  }
+
+  @Override
+  public RelTraitDef getTraitDef() {
+    return ConventionTraitDef.INSTANCE;
+  }
+
+  @Override
+  public boolean satisfies(RelTrait trait) {
+    return this == trait;
+  }
+
+  @Override
+  public void register(RelOptPlanner planner) {
+  }
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  @Override
+  public boolean canConvertConvention(Convention toConvention) {
+    return false;
+  }
+
+  @Override
+  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
new file mode 100644
index 0000000..1ca972a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearMinusRel extends Minus implements GearRelNode {
+
+  private GearSetOperatorRelBase delegate;
+
+  public GearMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+                      boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.MINUS, inputs, all);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new GearMinusRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
new file mode 100644
index 0000000..f09dc8c
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearProjectRel extends Project implements GearRelNode {
+
+  public GearProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+                        List<? extends RexNode> projects, RelDataType rowType) {
+    super(cluster, traits, input, projects, rowType);
+  }
+
+  @Override
+  public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+                      RelDataType rowType) {
+    return new GearProjectRel(getCluster(), traitSet, input, projects, rowType);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
new file mode 100644
index 0000000..042c767
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public interface GearRelNode extends RelNode {
+
+  JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
new file mode 100644
index 0000000..ee59753
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class GearSetOperatorRelBase {
+
+  public enum OpType implements Serializable {
+    UNION,
+    INTERSECT,
+    MINUS
+  }
+
+  private GearRelNode gearRelNode;
+  private List<RelNode> inputs;
+  private boolean all;
+  private OpType opType;
+
+  public GearSetOperatorRelBase(GearRelNode gearRelNode, OpType opType,
+                                List<RelNode> inputs, boolean all) {
+    this.gearRelNode = gearRelNode;
+    this.opType = opType;
+    this.inputs = inputs;
+    this.all = all;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
new file mode 100644
index 0000000..f70481b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GearSortRel extends Sort implements GearRelNode {
+
+  private List<Integer> fieldIndices = new ArrayList<>();
+  private List<Boolean> orientation = new ArrayList<>();
+  private List<Boolean> nullsFirst = new ArrayList<>();
+
+  private int startIndex = 0;
+  private int count;
+
+  public GearSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation,
+                     RexNode offset, RexNode fetch) {
+    super(cluster, traits, child, collation, offset, fetch);
+
+    List<RexNode> fieldExps = getChildExps();
+    RelCollationImpl collationImpl = (RelCollationImpl) collation;
+    List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+    for (int i = 0; i < fieldExps.size(); i++) {
+      RexNode fieldExp = fieldExps.get(i);
+      RexInputRef inputRef = (RexInputRef) fieldExp;
+      fieldIndices.add(inputRef.getIndex());
+      orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
+
+      RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
+      if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+        rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
+      }
+      nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
+    }
+
+    if (fetch == null) {
+      throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
+    }
+
+    RexLiteral fetchLiteral = (RexLiteral) fetch;
+    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+    if (offset != null) {
+      RexLiteral offsetLiteral = (RexLiteral) offset;
+      startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+    }
+  }
+
+  @Override
+  public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
+                   RexNode offset, RexNode fetch) {
+    return new GearSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+  }
+
+  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+    return a.compareTo(b);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
new file mode 100644
index 0000000..54a6bbb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class GearSqlRelUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(GearSqlRelUtils.class);
+
+  private static final AtomicInteger sequence = new AtomicInteger(0);
+  private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+  public static String getStageName(GearRelNode relNode) {
+    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+      + sequence.getAndIncrement();
+  }
+
+  public static String getClassName(GearRelNode relNode) {
+    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+      + "_" + classSequence.getAndIncrement();
+  }
+
+  public static GearRelNode getGearRelInput(RelNode input) {
+    if (input instanceof RelSubset) {
+      // go with known best input
+      input = ((RelSubset) input).getBest();
+    }
+    return (GearRelNode) input;
+  }
+
+  public static String explain(final RelNode rel) {
+    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+  }
+
+  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+    String explain = "";
+    try {
+      explain = RelOptUtil.toString(rel);
+    } catch (StackOverflowError e) {
+      LOG.error("StackOverflowError occurred while extracting plan. "
+        + "Please report it to the dev@ mailing list.");
+      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+      LOG.error("Forcing plan to empty string and continue... "
+        + "SQL Runner may not working properly after.");
+    }
+    return explain;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
new file mode 100644
index 0000000..431368d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearUnionRel extends Union implements GearRelNode {
+
+  private GearSetOperatorRelBase delegate;
+
+  public GearUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+    super(cluster, traits, inputs, all);
+    this.delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.UNION, inputs, all);
+  }
+
+  public GearUnionRel(RelInput input) {
+    super(input);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new GearUnionRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
new file mode 100644
index 0000000..6bd9403
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearValuesRel extends Values implements GearRelNode {
+
+  public GearValuesRel(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples,
+                       RelTraitSet traits) {
+    super(cluster, rowType, tuples, traits);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
new file mode 100644
index 0000000..c1b1602
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.gearpump.sql.rel.GearAggregationRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.apache.gearpump.streaming.dsl.window.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+public class GearAggregationRule extends RelOptRule {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRule.class);
+  public static final GearAggregationRule INSTANCE =
+    new GearAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+  public GearAggregationRule(Class<? extends Aggregate> aggregateClass,
+                             Class<? extends Project> projectClass,
+                             RelBuilderFactory relBuilderFactory) {
+    super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null);
+  }
+
+  public GearAggregationRule(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+    final Project project = call.rel(1);
+    updateWindowTrigger(call, aggregate, project);
+  }
+
+  private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, Project project) {
+    ImmutableBitSet groupByFields = aggregate.getGroupSet();
+    List<RexNode> projectMapping = project.getProjects();
+
+    WindowFunction windowFn = new GlobalWindowFunction();
+    Trigger triggerFn;
+    int windowFieldIdx = -1;
+    Duration allowedLatence = Duration.ZERO;
+
+    for (int groupField : groupByFields.asList()) {
+      RexNode projNode = projectMapping.get(groupField);
+      if (projNode instanceof RexCall) {
+        SqlOperator op = ((RexCall) projNode).op;
+        ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+        String functionName = op.getName();
+        switch (functionName) {
+          case "TUMBLE":
+            windowFieldIdx = groupField;
+            windowFn = (WindowFunction) FixedWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+            if (parameters.size() == 3) {
+              GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+              triggerFn = createTriggerWithDelay(delayTime);
+              allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+            }
+            break;
+          case "HOP":
+            windowFieldIdx = groupField;
+            windowFn = (WindowFunction) SlidingWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))), Duration.ofMillis(getWindowParameterAsMillis(parameters.get(2))));
+
+            if (parameters.size() == 4) {
+              GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+                .getValue();
+              triggerFn = createTriggerWithDelay(delayTime);
+              allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+            }
+            break;
+          case "SESSION":
+            windowFieldIdx = groupField;
+            windowFn = (WindowFunction) SessionWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+            if (parameters.size() == 3) {
+              GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+              triggerFn = createTriggerWithDelay(delayTime);
+              allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+            }
+            break;
+          default:
+            break;
+        }
+      }
+    }
+
+    try {
+      GearAggregationRel gearRel = new GearAggregationRel(aggregate.getCluster(),
+        aggregate.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+        convert(aggregate.getInput(),
+          aggregate.getInput().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+        aggregate.indicator,
+        aggregate.getGroupSet(),
+        aggregate.getGroupSets(),
+        aggregate.getAggCallList());
+      gearRel.buildGearPipeline(GearConfiguration.app, null);
+      GearConfiguration.app.submit().waitUntilFinish();
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+    }
+
+  }
+
+  private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+    return null;
+  }
+
+  private long getWindowParameterAsMillis(RexNode parameterNode) {
+    if (parameterNode instanceof RexLiteral) {
+      return RexLiteral.intValue(parameterNode);
+    } else {
+      throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
new file mode 100644
index 0000000..4817ee8
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.gearpump.sql.rel.GearFilterRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearFilterRule extends ConverterRule {
+
+  public static final GearFilterRule INSTANCE = new GearFilterRule();
+
+  private GearFilterRule() {
+    super(LogicalFilter.class, Convention.NONE, GearLogicalConvention.INSTANCE, "GearFilterRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Filter filter = (Filter) rel;
+    final RelNode input = filter.getInput();
+
+    GearFilterRel gearRel = new GearFilterRel(filter.getCluster(),
+      filter.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+      convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+      filter.getCondition());
+    return gearRel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/995c8cc0/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
new file mode 100644
index 0000000..e81f948
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearFlatMapRel;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GearFlatMapRule extends ConverterRule {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRule.class);
+  public static final GearFlatMapRule INSTANCE = new GearFlatMapRule(Aggregate.class, Convention.NONE);
+
+  public GearFlatMapRule(Class<? extends Aggregate> aggregateClass, RelTrait projectIn) {
+    super(aggregateClass, projectIn, GearLogicalConvention.INSTANCE, "GearFlatMapRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    try {
+      GearFlatMapRel flatRel = new GearFlatMapRel();
+      flatRel.buildGearPipeline(GearConfiguration.app, null);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+    }
+    return null;
+  }
+
+}