You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/19 00:30:59 UTC

[3/3] incubator-gearpump git commit: [GEARPUMP-217] Add Gearpump rel, rule and examples.

[GEARPUMP-217] Add Gearpump rel, rule and examples.

Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Author: Buddhi Rathnayaka <bu...@cse.mrt.ac.lk>

Closes #217 from buddhiayesha2015/upstream_sql.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/54686e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/54686e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/54686e0e

Branch: refs/heads/sql
Commit: 54686e0e28efcca4901fe06589d2736daad2e606
Parents: e04df0d
Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Authored: Sat Aug 19 08:24:49 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Aug 19 08:25:49 2017 +0800

----------------------------------------------------------------------
 .../examples/wordcountjava/WordCountSpec.scala  |  13 +-
 experiments/sql/README.md                       |  10 +-
 .../main/java/org/apache/calcite/SQLNode.java   | 112 ------
 .../org/apache/calcite/planner/Connection.java  | 294 --------------
 .../org/apache/calcite/planner/LogicalPlan.java |  43 --
 .../java/org/apache/calcite/planner/Query.java  |  79 ----
 .../calcite/planner/StreamQueryPlanner.java     |  91 -----
 .../calcite/table/TransactionsTableFactory.java |  88 ----
 .../utils/CalciteFrameworkConfiguration.java    |  58 ---
 .../calcite/validator/CalciteSqlValidator.java  |  50 ---
 .../java/org/apache/gearpump/sql/SQLNode.java   | 127 ++++++
 .../apache/gearpump/sql/planner/Connection.java | 292 ++++++++++++++
 .../sql/planner/GearRelDataTypeSystem.java      |  41 ++
 .../gearpump/sql/planner/GearRuleSets.java      |  60 +++
 .../gearpump/sql/planner/LogicalPlan.java       |  43 ++
 .../org/apache/gearpump/sql/planner/Query.java  |  80 ++++
 .../sql/planner/StreamQueryPlanner.java         |  96 +++++
 .../gearpump/sql/rel/GearAggregationRel.java    | 120 ++++++
 .../apache/gearpump/sql/rel/GearFilterRel.java  |  47 +++
 .../apache/gearpump/sql/rel/GearFlatMapRel.java | 112 ++++++
 .../apache/gearpump/sql/rel/GearIOSinkRel.java  |  52 +++
 .../gearpump/sql/rel/GearIOSourceRel.java       |  39 ++
 .../gearpump/sql/rel/GearIntersectRel.java      |  54 +++
 .../apache/gearpump/sql/rel/GearJoinRel.java    |  94 +++++
 .../gearpump/sql/rel/GearLogicalConvention.java |  65 +++
 .../apache/gearpump/sql/rel/GearMinusRel.java   |  51 +++
 .../apache/gearpump/sql/rel/GearProjectRel.java |  50 +++
 .../apache/gearpump/sql/rel/GearRelNode.java    |  30 ++
 .../sql/rel/GearSetOperatorRelBase.java         |  47 +++
 .../apache/gearpump/sql/rel/GearSortRel.java    |  95 +++++
 .../gearpump/sql/rel/GearSqlRelUtils.java       |  71 ++++
 .../apache/gearpump/sql/rel/GearUnionRel.java   |  55 +++
 .../apache/gearpump/sql/rel/GearValuesRel.java  |  42 ++
 .../gearpump/sql/rule/GearAggregationRule.java  | 147 +++++++
 .../gearpump/sql/rule/GearFilterRule.java       |  48 +++
 .../gearpump/sql/rule/GearFlatMapRule.java      |  52 +++
 .../gearpump/sql/rule/GearIOSinkRule.java       |  79 ++++
 .../gearpump/sql/rule/GearIOSourceRule.java     |  46 +++
 .../gearpump/sql/rule/GearIntersectRule.java    |  51 +++
 .../apache/gearpump/sql/rule/GearJoinRule.java  |  53 +++
 .../apache/gearpump/sql/rule/GearMinusRule.java |  51 +++
 .../gearpump/sql/rule/GearProjectRule.java      |  48 +++
 .../apache/gearpump/sql/rule/GearSortRule.java  |  51 +++
 .../apache/gearpump/sql/rule/GearUnionRule.java |  49 +++
 .../gearpump/sql/rule/GearValuesRule.java       |  48 +++
 .../apache/gearpump/sql/table/SampleString.java |  45 +++
 .../gearpump/sql/table/SampleTransactions.java  |  60 +++
 .../sql/table/TransactionsTableFactory.java     |  88 ++++
 .../utils/CalciteFrameworkConfiguration.java    |  58 +++
 .../gearpump/sql/utils/GearConfiguration.java   |  49 +++
 .../sql/validator/CalciteSqlValidator.java      |  50 +++
 .../gearpump/experiments/sql/Connection.scala   |   3 -
 .../org/apache/calcite/planner/CalciteTest.java | 323 ---------------
 .../org/apache/calcite/planner/QueryTest.java   |  64 ---
 .../gearpump/sql/example/SqlWordCountTest.java  | 125 ++++++
 .../gearpump/sql/planner/CalciteTest.java       | 397 +++++++++++++++++++
 .../apache/gearpump/sql/planner/QueryTest.java  |  65 +++
 experiments/sql/src/test/resources/model.json   |   2 +-
 58 files changed, 3438 insertions(+), 1215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
index 3736c86..7df8651 100644
--- a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -18,16 +18,15 @@
 
 package org.apache.gearpump.streaming.examples.wordcountjava
 
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
 import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
 import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.streaming.examples.wordcountjava.WordCount
+import org.apache.gearpump.streaming.examples.wordcountjava.dsl.WordCount
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import scala.concurrent.Future
+import scala.util.Success
 
 class WordCountSpec
   extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/README.md
----------------------------------------------------------------------
diff --git a/experiments/sql/README.md b/experiments/sql/README.md
index 6e9d490..9880dff 100644
--- a/experiments/sql/README.md
+++ b/experiments/sql/README.md
@@ -1,2 +1,8 @@
-SQL Support
-===========
\ No newline at end of file
+# SQL Support
+This project is about building a SQL layer with Apache Calcite to help those who are unfamiliar with Scala/Java to use Gearpump.
+
+## Build
+- Build [GearPump SQL](https://github.com/buddhiayesha2015/incubator-gearpump/tree/sql/experiments/sql)
+
+## Test
+- Run [SQL WordCount example](https://github.com/buddhiayesha2015/incubator-gearpump/blob/sql/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java b/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
deleted file mode 100644
index c092bf0..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.calcite;
-
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.fun.SqlCase;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.parser.impl.SqlParserImpl;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql.validate.SqlValidatorScope;
-import org.apache.calcite.util.Util;
-import org.junit.ComparisonFailure;
-
-import java.util.regex.Pattern;
-
-/**
- * Created by Buddhi on 6/8/2017.
- */
-public class SQLNode {
-
-    private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n");
-
-    private static final Pattern TAB_PATTERN = Pattern.compile("\t");
-
-    private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \"";
-
-    private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() {
-        @Override
-        protected boolean[] initialValue() {
-            return new boolean[]{true};
-        }
-    };
-
-
-    protected SqlParser getSqlParser(String sql) {
-        return SqlParser.create(sql,
-                SqlParser.configBuilder()
-                        .setParserFactory(SqlParserImpl.FACTORY)
-                        .setQuoting(Quoting.DOUBLE_QUOTE)
-                        .setUnquotedCasing(Casing.TO_UPPER)
-                        .setQuotedCasing(Casing.UNCHANGED)
-                        .setConformance(SqlConformanceEnum.DEFAULT)
-                        .build());
-    }
-
-    public static String toJavaString(String s) {
-        s = Util.replace(s, "\"", "\\\"");
-        s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK);
-        s = TAB_PATTERN.matcher(s).replaceAll("\\\\t");
-        s = "\"" + s + "\"";
-        String spurious = "\n \\+ \"\"";
-        if (s.endsWith(spurious)) {
-            s = s.substring(0, s.length() - spurious.length());
-        }
-        return s;
-    }
-
-    public static void assertEqualsVerbose(String expected, String actual) {
-        if (actual == null) {
-            if (expected == null) {
-                return;
-            } else {
-                String message = "Expected:\n" + expected + "\nActual: null";
-                throw new ComparisonFailure(message, expected, null);
-            }
-        }
-        if ((expected != null) && expected.equals(actual)) {
-            return;
-        }
-        String s = toJavaString(actual);
-        String message = "Expected:\n" + expected + "\nActual:\n" +
-                actual + "\nActual java:\n" + s + '\n';
-
-        throw new ComparisonFailure(message, expected, actual);
-    }
-
-    public void check(String sql, String expected) {
-        final SqlNode sqlNode;
-        try {
-            sqlNode = getSqlParser(sql).parseStmt();
-        } catch (SqlParseException e) {
-            throw new RuntimeException("Error while parsing SQL: " + sql, e);
-        }
-
-        String actual = sqlNode.toSqlString(null, true).getSql();
-        if (LINUXIFY.get()[0]) {
-            actual = Util.toLinux(actual);
-        }
-        assertEqualsVerbose(expected, actual);
-    }
-
-    public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) {
-        SqlCase sqlCase = (SqlCase) call;
-        SqlNodeList whenOperands = sqlCase.getWhenOperands();
-        SqlNodeList thenOperands = sqlCase.getThenOperands();
-        SqlNode elseOperand = sqlCase.getElseOperand();
-        for (SqlNode operand : whenOperands) {
-            operand.validateExpr(validator, operandScope);
-        }
-        for (SqlNode operand : thenOperands) {
-            operand.validateExpr(validator, operandScope);
-        }
-        if (elseOperand != null) {
-            elseOperand.validateExpr(validator, operandScope);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
deleted file mode 100644
index 1b8648d..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.log4j.Logger;
-
-import java.lang.reflect.Type;
-import java.sql.*;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-public class Connection implements CalciteConnection {
-
-    private final static Logger logger = Logger.getLogger(Connection.class);
-    private final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
-    private String schema = null;
-
-    public SchemaPlus getRootSchema() {
-        return rootSchema;
-    }
-
-    public JavaTypeFactory getTypeFactory() {
-        return null;
-    }
-
-    public Properties getProperties() {
-        return null;
-    }
-
-    public Statement createStatement() throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql) throws SQLException {
-        return null;
-    }
-
-    public CallableStatement prepareCall(String sql) throws SQLException {
-        return null;
-    }
-
-    public String nativeSQL(String sql) throws SQLException {
-        return null;
-    }
-
-    public void setAutoCommit(boolean autoCommit) throws SQLException {
-
-    }
-
-    public boolean getAutoCommit() throws SQLException {
-        return false;
-    }
-
-    public void commit() throws SQLException {
-
-    }
-
-    public void rollback() throws SQLException {
-
-    }
-
-    public void close() throws SQLException {
-
-    }
-
-    public boolean isClosed() throws SQLException {
-        return false;
-    }
-
-    public DatabaseMetaData getMetaData() throws SQLException {
-        return null;
-    }
-
-    public void setReadOnly(boolean readOnly) throws SQLException {
-
-    }
-
-    public boolean isReadOnly() throws SQLException {
-        return false;
-    }
-
-    public void setCatalog(String catalog) throws SQLException {
-
-    }
-
-    public String getCatalog() throws SQLException {
-        return null;
-    }
-
-    public void setTransactionIsolation(int level) throws SQLException {
-
-    }
-
-    public int getTransactionIsolation() throws SQLException {
-        return 0;
-    }
-
-    public SQLWarning getWarnings() throws SQLException {
-        return null;
-    }
-
-    public void clearWarnings() throws SQLException {
-
-    }
-
-    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
-        return null;
-    }
-
-    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
-        return null;
-    }
-
-    public Map<String, Class<?>> getTypeMap() throws SQLException {
-        return null;
-    }
-
-    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
-
-    }
-
-    public void setHoldability(int holdability) throws SQLException {
-
-    }
-
-    public int getHoldability() throws SQLException {
-        return 0;
-    }
-
-    public Savepoint setSavepoint() throws SQLException {
-        return null;
-    }
-
-    public Savepoint setSavepoint(String name) throws SQLException {
-        return null;
-    }
-
-    public void rollback(Savepoint savepoint) throws SQLException {
-
-    }
-
-    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
-
-    }
-
-    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-        return null;
-    }
-
-    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
-        return null;
-    }
-
-    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
-        return null;
-    }
-
-    public Clob createClob() throws SQLException {
-        return null;
-    }
-
-    public Blob createBlob() throws SQLException {
-        return null;
-    }
-
-    public NClob createNClob() throws SQLException {
-        return null;
-    }
-
-    public SQLXML createSQLXML() throws SQLException {
-        return null;
-    }
-
-    public boolean isValid(int timeout) throws SQLException {
-        return false;
-    }
-
-    public void setClientInfo(String name, String value) throws SQLClientInfoException {
-
-    }
-
-    public void setClientInfo(Properties properties) throws SQLClientInfoException {
-
-    }
-
-    public String getClientInfo(String name) throws SQLException {
-        return null;
-    }
-
-    public Properties getClientInfo() throws SQLException {
-        return null;
-    }
-
-    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
-        return null;
-    }
-
-    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
-        return null;
-    }
-
-    public void setSchema(String s) throws SQLException {
-        schema = s;
-    }
-
-    public String getSchema() throws SQLException {
-        return schema;
-    }
-
-    public void abort(Executor executor) throws SQLException {
-
-    }
-
-    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
-
-    }
-
-    public int getNetworkTimeout() throws SQLException {
-        return 0;
-    }
-
-    public CalciteConnectionConfig config() {
-        return null;
-    }
-
-    public <T> T unwrap(Class<T> iface) throws SQLException {
-        return null;
-    }
-
-    public boolean isWrapperFor(Class<?> iface) throws SQLException {
-        return false;
-    }
-
-    public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
-        return null;
-    }
-
-    public <T> Queryable<T> createQuery(Expression expression, Type type) {
-        return null;
-    }
-
-    public <T> T execute(Expression expression, Class<T> aClass) {
-        return null;
-    }
-
-    public <T> T execute(Expression expression, Type type) {
-        return null;
-    }
-
-    public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
deleted file mode 100644
index 516ebe1..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-
-public class LogicalPlan {
-
-    public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException,
-            RelConversionException {
-        SqlNode sqlNode;
-
-        try {
-            sqlNode = planner.parse(query);
-        } catch (SqlParseException e) {
-            throw new RuntimeException("SQL query parsing error", e);
-        }
-        SqlNode validatedSqlNode = planner.validate(sqlNode);
-
-        return planner.rel(validatedSqlNode).project();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
deleted file mode 100644
index 9008a16..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.*;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This Class is intended to test functions of Apache Calcite
- */
-public class Query {
-
-    private final static Logger logger = Logger.getLogger(Query.class);
-    private final Planner queryPlanner;
-
-    public Query(SchemaPlus schema) {
-
-        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
-        traitDefs.add(ConventionTraitDef.INSTANCE);
-        traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-        FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder()
-                .parserConfig(SqlParser.configBuilder()
-                        .setLex(Lex.MYSQL)
-                        .build())
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .context(Contexts.EMPTY_CONTEXT)
-                .ruleSets(RuleSets.ofList())
-                .costFactory(null)
-                .typeSystem(RelDataTypeSystem.DEFAULT)
-                .build();
-        this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig);
-    }
-
-    public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException {
-        SqlNode sqlNode = null;
-        try {
-            sqlNode = queryPlanner.parse(query);
-        } catch (SqlParseException e) {
-            logger.error("SQL Parse Exception", e);
-        }
-
-        SqlNode validatedSqlNode = queryPlanner.validate(sqlNode);
-        return queryPlanner.rel(validatedSqlNode).project();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
deleted file mode 100644
index 0603827..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.adapter.java.ReflectiveSchema;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.*;
-import org.apache.calcite.utils.CalciteFrameworkConfiguration;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public class StreamQueryPlanner {
-
-    public static void main(String[] args) throws ClassNotFoundException, SQLException, ValidationException, RelConversionException {
-
-        Class.forName("org.apache.calcite.jdbc.Driver");
-        Connection connection = DriverManager.getConnection("jdbc:calcite:");
-        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-        SchemaPlus rootSchema = calciteConnection.getRootSchema();
-        rootSchema.add("t", new ReflectiveSchema(new Transactions()));
-
-        FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
-        Planner planner = Frameworks.getPlanner(frameworkConfig);
-
-        String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products "
-                + "where t.orders.id = t.products.id group by t.orders.id, name "
-                + "having sum(quantity) > 5 order by sum(quantity) ";
-
-        RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner);
-        System.out.println(RelOptUtil.toString(logicalPlan));
-    }
-
-    public static class Transactions {
-
-        public final Order[] orders = {
-                new Order("001", 3),
-                new Order("002", 5),
-                new Order("003", 8),
-                new Order("004", 15),
-        };
-
-        public final Product[] products = {
-                new Product("001", "Book"),
-                new Product("002", "Pen"),
-                new Product("003", "Pencil"),
-                new Product("004", "Ruler"),
-        };
-    }
-
-    public static class Order {
-        public final String id;
-        public final int quantity;
-
-        public Order(String id, int quantity) {
-            this.id = id;
-            this.quantity = quantity;
-        }
-    }
-
-    public static class Product {
-        public final String id;
-        public final String name;
-
-        public Product(String id, String name) {
-            this.id = id;
-            this.name = name;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
deleted file mode 100644
index 42c2f92..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.table;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.Map;
-
-public class TransactionsTableFactory implements TableFactory<Table> {
-
-    @Override
-    public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) {
-        final Object[][] rows = {
-                {100, "I001", "item1", 3},
-                {101, "I002", "item2", 5},
-                {102, "I003", "item3", 8},
-                {103, "I004", "item4", 33},
-                {104, "I005", "item5", 23}
-        };
-
-        return new TransactionsTable(ImmutableList.copyOf(rows));
-    }
-
-    public static class TransactionsTable implements ScannableTable {
-
-        protected final RelProtoDataType protoRowType = new RelProtoDataType() {
-            public RelDataType apply(RelDataTypeFactory a0) {
-                return a0.builder()
-                        .add("timeStamp", SqlTypeName.TIMESTAMP)
-                        .add("id", SqlTypeName.VARCHAR, 10)
-                        .add("item", SqlTypeName.VARCHAR, 50)
-                        .add("quantity", SqlTypeName.INTEGER)
-                        .build();
-            }
-        };
-
-        private final ImmutableList<Object[]> rows;
-
-        public TransactionsTable(ImmutableList<Object[]> rows) {
-            this.rows = rows;
-        }
-
-        public Enumerable<Object[]> scan(DataContext root) {
-            return Linq4j.asEnumerable(rows);
-        }
-
-        @Override
-        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-            return protoRowType.apply(typeFactory);
-        }
-
-        @Override
-        public Statistic getStatistic() {
-            return Statistics.UNKNOWN;
-        }
-
-        @Override
-        public Schema.TableType getJdbcTableType() {
-            return Schema.TableType.TABLE;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
deleted file mode 100644
index e6be58a..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.utils;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RuleSets;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CalciteFrameworkConfiguration {
-
-    public static FrameworkConfig getDefaultconfig(SchemaPlus schema) {
-        final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
-        traitDefs.add(ConventionTraitDef.INSTANCE);
-        traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-        FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder()
-                .parserConfig(SqlParser.configBuilder()
-                        .setLex(Lex.JAVA)
-                        .build())
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .context(Contexts.EMPTY_CONTEXT)
-                .ruleSets(RuleSets.ofList())
-                .costFactory(null)
-                .typeSystem(RelDataTypeSystem.DEFAULT)
-                .build();
-
-        return frameworkConfiguration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
deleted file mode 100644
index fb132e2..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.validator;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.sql.validate.SqlValidatorImpl;
-
-public class CalciteSqlValidator extends SqlValidatorImpl {
-    public CalciteSqlValidator(SqlOperatorTable opTab,
-                               CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory,
-                               SqlConformance conformance) {
-        super(opTab, catalogReader, typeFactory, conformance);
-    }
-
-    @Override
-    protected RelDataType getLogicalSourceRowType(
-            RelDataType sourceRowType, SqlInsert insert) {
-        final RelDataType superType =
-                super.getLogicalSourceRowType(sourceRowType, insert);
-        return ((JavaTypeFactory) typeFactory).toSql(superType);
-    }
-
-    @Override
-    protected RelDataType getLogicalTargetRowType(
-            RelDataType targetRowType, SqlInsert insert) {
-        final RelDataType superType =
-                super.getLogicalTargetRowType(targetRowType, insert);
-        return ((JavaTypeFactory) typeFactory).toSql(superType);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
new file mode 100644
index 0000000..7fcc3a1
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.impl.SqlParserImpl;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.util.Util;
+import org.junit.ComparisonFailure;
+
+import java.util.regex.Pattern;
+
+public class SQLNode {
+
+  private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n");
+
+  private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+  private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \"";
+
+  private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() {
+    @Override
+    protected boolean[] initialValue() {
+      return new boolean[]{true};
+    }
+  };
+
+
+  protected SqlParser getSqlParser(String sql) {
+    return SqlParser.create(sql,
+      SqlParser.configBuilder()
+        .setParserFactory(SqlParserImpl.FACTORY)
+        .setQuoting(Quoting.DOUBLE_QUOTE)
+        .setUnquotedCasing(Casing.TO_UPPER)
+        .setQuotedCasing(Casing.UNCHANGED)
+        .setConformance(SqlConformanceEnum.DEFAULT)
+        .build());
+  }
+
+  public static String toJavaString(String s) {
+    s = Util.replace(s, "\"", "\\\"");
+    s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK);
+    s = TAB_PATTERN.matcher(s).replaceAll("\\\\t");
+    s = "\"" + s + "\"";
+    String spurious = "\n \\+ \"\"";
+    if (s.endsWith(spurious)) {
+      s = s.substring(0, s.length() - spurious.length());
+    }
+    return s;
+  }
+
+  public static void assertEqualsVerbose(String expected, String actual) {
+    if (actual == null) {
+      if (expected == null) {
+        return;
+      } else {
+        String message = "Expected:\n" + expected + "\nActual: null";
+        throw new ComparisonFailure(message, expected, null);
+      }
+    }
+    if ((expected != null) && expected.equals(actual)) {
+      return;
+    }
+    String s = toJavaString(actual);
+    String message = "Expected:\n" + expected + "\nActual:\n" +
+      actual + "\nActual java:\n" + s + '\n';
+
+    throw new ComparisonFailure(message, expected, actual);
+  }
+
+  public void check(String sql, String expected) {
+    final SqlNode sqlNode;
+    try {
+      sqlNode = getSqlParser(sql).parseStmt();
+    } catch (SqlParseException e) {
+      throw new RuntimeException("Error while parsing SQL: " + sql, e);
+    }
+
+    String actual = sqlNode.toSqlString(null, true).getSql();
+    if (LINUXIFY.get()[0]) {
+      actual = Util.toLinux(actual);
+    }
+    assertEqualsVerbose(expected, actual);
+  }
+
+  public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) {
+    SqlCase sqlCase = (SqlCase) call;
+    SqlNodeList whenOperands = sqlCase.getWhenOperands();
+    SqlNodeList thenOperands = sqlCase.getThenOperands();
+    SqlNode elseOperand = sqlCase.getElseOperand();
+    for (SqlNode operand : whenOperands) {
+      operand.validateExpr(validator, operandScope);
+    }
+    for (SqlNode operand : thenOperands) {
+      operand.validateExpr(validator, operandScope);
+    }
+    if (elseOperand != null) {
+      elseOperand.validateExpr(validator, operandScope);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
new file mode 100644
index 0000000..e5954f0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class Connection implements CalciteConnection {
+
+  private final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
+  private String schema = null;
+
+  public SchemaPlus getRootSchema() {
+    return rootSchema;
+  }
+
+  public JavaTypeFactory getTypeFactory() {
+    return null;
+  }
+
+  public Properties getProperties() {
+    return null;
+  }
+
+  public Statement createStatement() throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return null;
+  }
+
+  public String nativeSQL(String sql) throws SQLException {
+    return null;
+  }
+
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+  }
+
+  public boolean getAutoCommit() throws SQLException {
+    return false;
+  }
+
+  public void commit() throws SQLException {
+
+  }
+
+  public void rollback() throws SQLException {
+
+  }
+
+  public void close() throws SQLException {
+
+  }
+
+  public boolean isClosed() throws SQLException {
+    return false;
+  }
+
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return null;
+  }
+
+  public void setReadOnly(boolean readOnly) throws SQLException {
+
+  }
+
+  public boolean isReadOnly() throws SQLException {
+    return false;
+  }
+
+  public void setCatalog(String catalog) throws SQLException {
+
+  }
+
+  public String getCatalog() throws SQLException {
+    return null;
+  }
+
+  public void setTransactionIsolation(int level) throws SQLException {
+
+  }
+
+  public int getTransactionIsolation() throws SQLException {
+    return 0;
+  }
+
+  public SQLWarning getWarnings() throws SQLException {
+    return null;
+  }
+
+  public void clearWarnings() throws SQLException {
+
+  }
+
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return null;
+  }
+
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return null;
+  }
+
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+  }
+
+  public void setHoldability(int holdability) throws SQLException {
+
+  }
+
+  public int getHoldability() throws SQLException {
+    return 0;
+  }
+
+  public Savepoint setSavepoint() throws SQLException {
+    return null;
+  }
+
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return null;
+  }
+
+  public void rollback(Savepoint savepoint) throws SQLException {
+
+  }
+
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+  }
+
+  public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return null;
+  }
+
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return null;
+  }
+
+  public Clob createClob() throws SQLException {
+    return null;
+  }
+
+  public Blob createBlob() throws SQLException {
+    return null;
+  }
+
+  public NClob createNClob() throws SQLException {
+    return null;
+  }
+
+  public SQLXML createSQLXML() throws SQLException {
+    return null;
+  }
+
+  public boolean isValid(int timeout) throws SQLException {
+    return false;
+  }
+
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+  }
+
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+  }
+
+  public String getClientInfo(String name) throws SQLException {
+    return null;
+  }
+
+  public Properties getClientInfo() throws SQLException {
+    return null;
+  }
+
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return null;
+  }
+
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return null;
+  }
+
+  public void setSchema(String s) throws SQLException {
+    schema = s;
+  }
+
+  public String getSchema() throws SQLException {
+    return schema;
+  }
+
+  public void abort(Executor executor) throws SQLException {
+
+  }
+
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+  }
+
+  public int getNetworkTimeout() throws SQLException {
+    return 0;
+  }
+
+  public CalciteConnectionConfig config() {
+    return null;
+  }
+
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return null;
+  }
+
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return false;
+  }
+
+  public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
+    return null;
+  }
+
+  public <T> Queryable<T> createQuery(Expression expression, Type type) {
+    return null;
+  }
+
+  public <T> T execute(Expression expression, Class<T> aClass) {
+    return null;
+  }
+
+  public <T> T execute(Expression expression, Type type) {
+    return null;
+  }
+
+  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
new file mode 100644
index 0000000..a640e12
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data types.
+ */
+public class GearRelDataTypeSystem extends RelDataTypeSystemImpl {
+
+  public static final RelDataTypeSystem GEAR_REL_DATATYPE_SYSTEM = new GearRelDataTypeSystem();
+
+  @Override
+  public int getMaxNumericScale() {
+    return 38;
+  }
+
+  @Override
+  public int getMaxNumericPrecision() {
+    return 38;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
new file mode 100644
index 0000000..a962ff1
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.gearpump.sql.rule.*;
+
+import java.util.Iterator;
+
+public class GearRuleSets {
+  private static final ImmutableSet<RelOptRule> calciteToGearConversionRules = ImmutableSet
+    .<RelOptRule>builder().add(GearIOSourceRule.INSTANCE, GearProjectRule.INSTANCE,
+      GearFilterRule.INSTANCE, GearIOSinkRule.INSTANCE,
+      GearAggregationRule.INSTANCE, GearSortRule.INSTANCE, GearValuesRule.INSTANCE,
+      GearIntersectRule.INSTANCE, GearMinusRule.INSTANCE, GearUnionRule.INSTANCE,
+      GearJoinRule.INSTANCE)
+    .build();
+
+  public static RuleSet[] getRuleSets() {
+    return new RuleSet[]{new GearRuleSet(
+      ImmutableSet.<RelOptRule>builder().addAll(calciteToGearConversionRules).build())};
+  }
+
+  private static class GearRuleSet implements RuleSet {
+    final ImmutableSet<RelOptRule> rules;
+
+    public GearRuleSet(ImmutableSet<RelOptRule> rules) {
+      this.rules = rules;
+    }
+
+    public GearRuleSet(ImmutableList<RelOptRule> rules) {
+      this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+    }
+
+    @Override
+    public Iterator<RelOptRule> iterator() {
+      return rules.iterator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
new file mode 100644
index 0000000..1448a71
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+public class LogicalPlan {
+
+  public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException,
+    RelConversionException {
+    SqlNode sqlNode;
+
+    try {
+      sqlNode = planner.parse(query);
+    } catch (SqlParseException e) {
+      throw new RuntimeException("SQL query parsing error", e);
+    }
+    SqlNode validatedSqlNode = planner.validate(sqlNode);
+
+    return planner.rel(validatedSqlNode).project();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
new file mode 100644
index 0000000..c18b8b5
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This Class is intended to test functions of Apache Calcite
+ */
+public class Query {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Query.class);
+  private final Planner queryPlanner;
+
+  public Query(SchemaPlus schema) {
+
+    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder()
+      .parserConfig(SqlParser.configBuilder()
+        .setLex(Lex.MYSQL)
+        .build())
+      .defaultSchema(schema)
+      .traitDefs(traitDefs)
+      .context(Contexts.EMPTY_CONTEXT)
+      .ruleSets(RuleSets.ofList())
+      .costFactory(null)
+      .typeSystem(RelDataTypeSystem.DEFAULT)
+      .build();
+    this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig);
+  }
+
+  public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException {
+    SqlNode sqlNode = null;
+    try {
+      sqlNode = queryPlanner.parse(query);
+    } catch (SqlParseException e) {
+      LOG.error(e.getMessage());
+    }
+
+    SqlNode validatedSqlNode = queryPlanner.validate(sqlNode);
+    return queryPlanner.rel(validatedSqlNode).project();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
new file mode 100644
index 0000000..10f9cbd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.*;
+import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class StreamQueryPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamQueryPlanner.class);
+
+  public static void main(String[] args) throws ClassNotFoundException,
+    SQLException, ValidationException, RelConversionException {
+
+    Class.forName("org.apache.calcite.jdbc.Driver");
+    Connection connection = DriverManager.getConnection("jdbc:calcite:");
+    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+    SchemaPlus rootSchema = calciteConnection.getRootSchema();
+    rootSchema.add("t", new ReflectiveSchema(new Transactions()));
+
+    FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
+    Planner planner = Frameworks.getPlanner(frameworkConfig);
+
+    String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products "
+      + "where t.orders.id = t.products.id group by t.orders.id, name "
+      + "having sum(quantity) > 5 order by sum(quantity) ";
+
+    RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner);
+    LOG.info("Relational Expression:- \n\n" + RelOptUtil.toString(logicalPlan));
+
+  }
+
+  public static class Transactions {
+
+    public final Order[] orders = {
+      new Order("001", 3),
+      new Order("002", 5),
+      new Order("003", 8),
+      new Order("004", 15),
+    };
+
+    public final Product[] products = {
+      new Product("001", "Book"),
+      new Product("002", "Pen"),
+      new Product("003", "Pencil"),
+      new Product("004", "Ruler"),
+    };
+  }
+
+  public static class Order {
+    public final String id;
+    public final int quantity;
+
+    public Order(String id, int quantity) {
+      this.id = id;
+      this.quantity = quantity;
+    }
+  }
+
+  public static class Product {
+    public final String id;
+    public final String name;
+
+    public Product(String id, String name) {
+      this.id = id;
+      this.name = name;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
new file mode 100644
index 0000000..46dc15b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import org.apache.gearpump.streaming.dsl.window.api.Trigger;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Duration;
+import java.util.List;
+
+public class GearAggregationRel extends Aggregate implements GearRelNode {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRel.class);
+  private int windowFieldIdx = -1;
+  private WindowFunction windowFn;
+  private Trigger trigger;
+  private Duration allowedLatence = Duration.ZERO;
+
+  public GearAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+                            boolean indicator, ImmutableBitSet groupSet,
+                            List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator,
+                        ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+                        List<AggregateCall> aggCalls) {
+    return null;
+  }
+
+  public RelWriter explainTerms(RelWriter pw) {
+    pw.item("group", groupSet)
+      .itemIf("window", windowFn, windowFn != null)
+      .itemIf("trigger", trigger, trigger != null)
+      .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+      .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+      .itemIf("indicator", indicator, indicator)
+      .itemIf("aggs", aggCalls, pw.nest());
+    if (!pw.nest()) {
+      for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+        pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+      }
+    }
+    return pw;
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+                                                               JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    LOG.debug("Adding Map");
+    JavaStream<Tuple2<String, Integer>> ones = SampleString.WORDS.map(new Ones(), "map");
+
+    LOG.debug("Adding GroupBy");
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(),
+      1, "groupBy");
+//        groupedOnes.log();
+    LOG.debug("Adding Reduce");
+    JavaStream<Tuple2<String, Integer>> wordCount = groupedOnes.reduce(new Count(), "reduce");
+    wordCount.log();
+
+    return wordCount;
+  }
+
+  private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+    @Override
+    public Tuple2<String, Integer> map(String s) {
+      return new Tuple2<>(s, 1);
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+    @Override
+    public String groupBy(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+    @Override
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
new file mode 100644
index 0000000..53a07d9
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearFilterRel extends Filter implements GearRelNode {
+
+  public GearFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+                       RexNode condition) {
+    super(cluster, traits, child, condition);
+  }
+
+  @Override
+  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new GearFilterRel(getCluster(), traitSet, input, condition);
+  }
+
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
new file mode 100644
index 0000000..d4c55fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.DefaultMessage;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class GearFlatMapRel extends Filter implements GearRelNode {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRel.class);
+
+  public GearFlatMapRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+    super(cluster, traits, child, condition);
+  }
+
+  public GearFlatMapRel() {
+    super(null, null, null, null);
+  }
+
+  @Override
+  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new GearFlatMapRel(getCluster(), traitSet, input, condition);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+                                                               JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    LOG.debug("Adding Source");
+    JavaStream<String> sentence = app.source(new StringSource(SampleString.Stream.getKV()),
+      1, UserConfig.empty(), "source");
+    LOG.debug("Adding flatMap");
+    SampleString.WORDS = sentence.flatMap(new Split(), "flatMap");
+    return null;
+  }
+
+  private static class StringSource implements DataSource {
+    private final String str;
+    private boolean hasNext = true;
+
+    StringSource(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public void open(TaskContext context, Instant startTime) {
+    }
+
+    @Override
+    public Message read() {
+      Message msg = new DefaultMessage(str, Instant.now());
+      hasNext = false;
+      return msg;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (hasNext) {
+        return Instant.now();
+      } else {
+        return Watermark.MAX();
+      }
+    }
+  }
+
+  private static class Split extends FlatMapFunction<String, String> {
+    @Override
+    public Iterator<String> flatMap(String s) {
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
new file mode 100644
index 0000000..1d61baf
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIOSinkRel extends TableModify implements GearRelNode {
+  public GearIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+                       Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+                       List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+    super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+      sourceExpressionList, flattened);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new GearIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+      getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
new file mode 100644
index 0000000..6641c35
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearIOSourceRel extends TableScan implements GearRelNode {
+
+  public GearIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+    super(cluster, traitSet, table);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
new file mode 100644
index 0000000..6888a26
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIntersectRel extends Intersect implements GearRelNode {
+  private GearSetOperatorRelBase delegate;
+
+  public GearIntersectRel(
+    RelOptCluster cluster,
+    RelTraitSet traits,
+    List<RelNode> inputs,
+    boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new GearSetOperatorRelBase(this,
+      GearSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+  }
+
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new GearIntersectRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override
+  public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+    return null;
+  }
+}