You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/19 00:30:57 UTC
[1/3] incubator-gearpump git commit: [GEARPUMP-217] Add Gearpump rel,
rule and examples.
Repository: incubator-gearpump
Updated Branches:
refs/heads/sql e04df0ddd -> 54686e0e2
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/java/org/apache/calcite/planner/CalciteTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/calcite/planner/CalciteTest.java b/experiments/sql/src/test/java/org/apache/calcite/planner/CalciteTest.java
deleted file mode 100644
index 4f0247a..0000000
--- a/experiments/sql/src/test/java/org/apache/calcite/planner/CalciteTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.adapter.java.ReflectiveSchema;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.*;
-import org.apache.calcite.plan.RelOptTable.ViewExpander;
-import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.rules.*;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexExecutor;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.sql2rel.RelDecorrelator;
-import org.apache.calcite.sql2rel.SqlRexConvertletTable;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Program;
-import org.apache.calcite.tools.Programs;
-import org.apache.calcite.util.Util;
-import org.apache.calcite.utils.CalciteFrameworkConfiguration;
-import org.apache.calcite.validator.CalciteSqlValidator;
-
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-
-public class CalciteTest {
-
- private final SqlOperatorTable operatorTable;
- private final FrameworkConfig config;
- private final ImmutableList<RelTraitDef> traitDefs;
- private final SqlParser.Config parserConfig;
- private final SqlRexConvertletTable convertletTable;
- private State state;
- private SchemaPlus defaultSchema;
- private JavaTypeFactory typeFactory;
- private RelOptPlanner planner;
- private RexExecutor executor;
- private RelRoot root;
-
- public CalciteTest(FrameworkConfig config) {
- this.config = config;
- this.defaultSchema = config.getDefaultSchema();
- this.operatorTable = config.getOperatorTable();
- this.parserConfig = config.getParserConfig();
- this.state = State.STATE_0_CLOSED;
- this.traitDefs = config.getTraitDefs();
- this.convertletTable = config.getConvertletTable();
- this.executor = config.getExecutor();
- reset();
- }
-
- private void ensure(State state) {
- if (state == this.state) {
- return;
- }
- if (state.ordinal() < this.state.ordinal()) {
- throw new IllegalArgumentException("cannot move to " + state + " from "
- + this.state);
- }
- state.from(this);
- }
-
- public void close() {
- typeFactory = null;
- state = State.STATE_0_CLOSED;
- }
-
- public void reset() {
- ensure(State.STATE_0_CLOSED);
- state = State.STATE_1_RESET;
- }
-
- private void ready() {
- switch (state) {
- case STATE_0_CLOSED:
- reset();
- }
- ensure(State.STATE_1_RESET);
- Frameworks.withPlanner(
- new Frameworks.PlannerAction<Void>() {
- public Void apply(RelOptCluster cluster, RelOptSchema relOptSchema,
- SchemaPlus rootSchema) {
- Util.discard(rootSchema); // use our own defaultSchema
- typeFactory = (JavaTypeFactory) cluster.getTypeFactory();
- planner = cluster.getPlanner();
- planner.setExecutor(executor);
- return null;
- }
- },
- config);
-
- state = State.STATE_2_READY;
-
- // If user specify own traitDef, instead of default default trait,
- // first, clear the default trait def registered with planner
- // then, register the trait def specified in traitDefs.
- if (this.traitDefs != null) {
- planner.clearRelTraitDefs();
- for (RelTraitDef def : this.traitDefs) {
- planner.addRelTraitDef(def);
- }
- }
- }
-
- private static SchemaPlus rootSchema(SchemaPlus schema) {
- for (; ; ) {
- if (schema.getParentSchema() == null) {
- return schema;
- }
- schema = schema.getParentSchema();
- }
- }
-
- private CalciteCatalogReader createCatalogReader() {
- SchemaPlus rootSchema = rootSchema(defaultSchema);
- return new CalciteCatalogReader(
- CalciteSchema.from(rootSchema),
- parserConfig.caseSensitive(),
- CalciteSchema.from(defaultSchema).path(null),
- typeFactory);
- }
-
- private RexBuilder createRexBuilder() {
- return new RexBuilder(typeFactory);
- }
-
- private SqlConformance conformance() {
- final Context context = config.getContext();
- if (context != null) {
- final CalciteConnectionConfig connectionConfig =
- context.unwrap(CalciteConnectionConfig.class);
- if (connectionConfig != null) {
- return connectionConfig.conformance();
- }
- }
- return SqlConformanceEnum.DEFAULT;
- }
-
- /**
- * Implements {@link org.apache.calcite.plan.RelOptTable.ViewExpander}
- * interface for {@link org.apache.calcite.tools.Planner}.
- */
- public class ViewExpanderImpl implements ViewExpander {
- @Override
- public RelRoot expandView(RelDataType rowType, String queryString,
- List<String> schemaPath, List<String> viewPath) {
- SqlParser parser = SqlParser.create(queryString, parserConfig);
- SqlNode sqlNode;
- try {
- sqlNode = parser.parseQuery();
- } catch (SqlParseException e) {
- throw new RuntimeException("parse failed", e);
- }
-
- final SqlConformance conformance = conformance();
- final CalciteCatalogReader catalogReader =
- createCatalogReader().withSchemaPath(schemaPath);
- final SqlValidator validator =
- new CalciteSqlValidator(operatorTable, catalogReader, typeFactory,
- conformance);
- validator.setIdentifierExpansion(true);
- final SqlNode validatedSqlNode = validator.validate(sqlNode);
-
- final RexBuilder rexBuilder = createRexBuilder();
- final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
- final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
- .withTrimUnusedFields(false).withConvertTableAccess(false).build();
- final SqlToRelConverter sqlToRelConverter =
- new SqlToRelConverter(new ViewExpanderImpl(), validator,
- catalogReader, cluster, convertletTable, config);
-
- root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false);
- root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel));
-
- return CalciteTest.this.root;
- }
- }
-
- private enum State {
- STATE_0_CLOSED {
- @Override
- void from(CalciteTest planner) {
- planner.close();
- }
- },
- STATE_1_RESET {
- @Override
- void from(CalciteTest planner) {
- planner.ensure(STATE_0_CLOSED);
- planner.reset();
- }
- },
- STATE_2_READY {
- @Override
- void from(CalciteTest planner) {
- STATE_1_RESET.from(planner);
- planner.ready();
- }
- },
- STATE_3_PARSED,
- STATE_4_VALIDATED,
- STATE_5_CONVERTED;
-
- /**
- * Moves planner's state to this state. This must be a higher state.
- */
- void from(CalciteTest planner) {
- throw new IllegalArgumentException("cannot move from " + planner.state
- + " to " + this);
- }
- }
-
-
- void calTest() throws SqlParseException {
-
-// String sql = "select t.orders.id from t.orders";
-//
-// String sql = "select t.products.id "
-// + "from t.orders, t.products "
-// + "where t.orders.id = t.products.id and quantity>2 ";
-
- String sql = "SELECT t.products.id AS product_id, t.products.name "
- + "AS product_name, t.orders.id AS order_id "
- + "FROM t.products JOIN t.orders ON t.products.id = t.orders.id WHERE quantity > 2";
-
- final SqlParser.Config parserConfig = SqlParser.configBuilder().setLex(Lex.MYSQL).build();
-
- // Parse the query
- SqlParser parser = SqlParser.create(sql, parserConfig);
- SqlNode sqlNode = parser.parseStmt();
-
- // Validate the query
- CalciteCatalogReader catalogReader = createCatalogReader();
- SqlValidator validator = SqlValidatorUtil.newValidator(
- SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlConformance.DEFAULT);
- SqlNode validatedSqlNode = validator.validate(sqlNode);
-
- // Convert SqlNode to RelNode
- RexBuilder rexBuilder = createRexBuilder();
- RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
- SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(
- new ViewExpanderImpl(),
- validator,
- createCatalogReader(),
- cluster,
- convertletTable);
- RelRoot root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
- System.out.println(RelOptUtil.toString(root.rel));
-
- // Optimize the plan
- RelOptPlanner planner = new VolcanoPlanner();
-
- // Create a set of rules to apply
- Program program = Programs.ofRules(
- FilterProjectTransposeRule.INSTANCE,
- ProjectMergeRule.INSTANCE,
- FilterMergeRule.INSTANCE,
- FilterJoinRule.JOIN,
- LoptOptimizeJoinRule.INSTANCE);
-
- RelTraitSet traitSet = planner.emptyTraitSet().replace(EnumerableConvention.INSTANCE);
-
- // Execute the program
- RelNode optimized = program.run(planner, root.rel, traitSet, ImmutableList.<RelOptMaterialization>of(), ImmutableList.<RelOptLattice>of());
- System.out.println(RelOptUtil.toString(optimized));
-
- }
-
- public static void main(String[] args) throws ClassNotFoundException, SQLException, SqlParseException {
-
- Class.forName("org.apache.calcite.jdbc.Driver");
- java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
- CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
- SchemaPlus rootSchema = calciteConnection.getRootSchema();
- rootSchema.add("t", new ReflectiveSchema(new StreamQueryPlanner.Transactions()));
-
- FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
- CalciteTest ct = new CalciteTest(frameworkConfig);
- ct.ready();
- ct.calTest();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/java/org/apache/calcite/planner/QueryTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/calcite/planner/QueryTest.java b/experiments/sql/src/test/java/org/apache/calcite/planner/QueryTest.java
deleted file mode 100644
index 1cc7102..0000000
--- a/experiments/sql/src/test/java/org/apache/calcite/planner/QueryTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import com.google.common.io.Resources;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.model.ModelHandler;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.log4j.Logger;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.sql.SQLException;
-
-public class QueryTest {
-
- private final static Logger logger = Logger.getLogger(QueryTest.class);
-
- @Test
- public void testLogicalPlan() {
-
- try {
- CalciteConnection connection = new Connection();
- String salesSchema = Resources.toString(Query.class.getResource("/model.json"), Charset.defaultCharset());
- new ModelHandler(connection, "inline:" + salesSchema);
-
- Query queryPlanner = new Query(connection.getRootSchema().getSubSchema(connection.getSchema()));
- RelNode logicalPlan = queryPlanner.getLogicalPlan("SELECT item FROM transactions");
-
- logger.info("Getting Logical Plan...");
- System.out.println(RelOptUtil.toString(logicalPlan));
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (RelConversionException e) {
- e.printStackTrace();
- } catch (ValidationException e) {
- e.printStackTrace();
- } catch (SQLException e) {
- e.printStackTrace();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
new file mode 100644
index 0000000..64c5af7
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.example;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.*;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rule.GearAggregationRule;
+import org.apache.gearpump.sql.rule.GearFlatMapRule;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+public class SqlWordCountTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqlWordCountTest.class);
+
+ private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) {
+ try {
+ return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs);
+ } catch (ClassNotFoundException e) {
+ LOG.error(e.getMessage());
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ }
+ return null;
+ }
+
+ private Planner getPlanner(List<RelTraitDef> traitDefs,
+ SqlParser.Config parserConfig,
+ Program... programs) throws ClassNotFoundException, SQLException {
+
+ Class.forName("org.apache.calcite.jdbc.Driver");
+ java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+ CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ SchemaPlus rootSchema = calciteConnection.getRootSchema();
+ rootSchema.add("STR", new ReflectiveSchema(new SampleString.Stream()));
+
+ final FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(parserConfig)
+ .defaultSchema(rootSchema)
+ .traitDefs(traitDefs)
+ .programs(programs)
+ .build();
+ return Frameworks.getPlanner(config);
+ }
+
+ void wordCountTest(GearConfiguration gearConfig) throws SqlParseException,
+ ValidationException, RelConversionException {
+
+ RuleSet ruleSet = RuleSets.ofList(
+ GearFlatMapRule.INSTANCE,
+ GearAggregationRule.INSTANCE);
+
+ Planner planner = getPlanner(null, Programs.of(ruleSet));
+
+ String sql = "SELECT COUNT(*) FROM str.kv GROUP BY str.kv.word";
+ System.out.println("SQL Query:-\t" + sql + "\n");
+
+ SqlNode parse = planner.parse(sql);
+ System.out.println("SQL Parse Tree:- \n" + parse.toString() + "\n\n");
+
+ SqlNode validate = planner.validate(parse);
+ RelNode convert = planner.rel(validate).project();
+ System.out.println("Relational Expression:- \n" + RelOptUtil.toString(convert) + "\n");
+
+ gearConfig.defaultConfiguration();
+ gearConfig.ConfigJavaStreamApp();
+
+ RelTraitSet traitSet = convert.getTraitSet().replace(GearLogicalConvention.INSTANCE);
+ try {
+ RelNode transform = planner.transform(0, traitSet, convert);
+ System.out.println(RelOptUtil.toString(transform));
+ } catch (Exception e) {
+ }
+
+ }
+
+
+ public static void main(String[] args) throws ClassNotFoundException,
+ SQLException, SqlParseException {
+
+ SqlWordCountTest gearSqlWordCount = new SqlWordCountTest();
+
+ try {
+ GearConfiguration gearConfig = new GearConfiguration();
+ gearSqlWordCount.wordCountTest(gearConfig);
+ } catch (ValidationException e) {
+ LOG.error(e.getMessage());
+ } catch (RelConversionException e) {
+ LOG.error(e.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
new file mode 100644
index 0000000..2f21531
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/CalciteTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.RelOptTable.ViewExpander;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexExecutor;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.RelDecorrelator;
+import org.apache.calcite.sql2rel.SqlRexConvertletTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.tools.*;
+import org.apache.calcite.util.Util;
+import org.apache.gearpump.sql.rule.GearFilterRule;
+import org.apache.gearpump.sql.table.SampleTransactions;
+import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration;
+import org.apache.gearpump.sql.validator.CalciteSqlValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+public class CalciteTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CalciteTest.class);
+
+ private SqlOperatorTable operatorTable;
+ private FrameworkConfig config;
+ private ImmutableList<RelTraitDef> traitDefs;
+ private SqlParser.Config parserConfig;
+ private SqlRexConvertletTable convertletTable;
+ private State state;
+ private SchemaPlus defaultSchema;
+ private JavaTypeFactory typeFactory;
+ private RelOptPlanner planner;
+ private RexExecutor executor;
+ private RelRoot root;
+
+ public CalciteTest(FrameworkConfig config) {
+ this.config = config;
+ this.defaultSchema = config.getDefaultSchema();
+ this.operatorTable = config.getOperatorTable();
+ this.parserConfig = config.getParserConfig();
+ this.state = State.STATE_0_CLOSED;
+ this.traitDefs = config.getTraitDefs();
+ this.convertletTable = config.getConvertletTable();
+ this.executor = config.getExecutor();
+ reset();
+ }
+
+ public CalciteTest() {
+ }
+
+ private void ensure(State state) {
+ if (state == this.state) {
+ return;
+ }
+ if (state.ordinal() < this.state.ordinal()) {
+ throw new IllegalArgumentException("cannot move to " + state + " from "
+ + this.state);
+ }
+ state.from(this);
+ }
+
+ public void close() {
+ typeFactory = null;
+ state = State.STATE_0_CLOSED;
+ }
+
+ public void reset() {
+ ensure(State.STATE_0_CLOSED);
+ state = State.STATE_1_RESET;
+ }
+
+ private void ready() {
+ switch (state) {
+ case STATE_0_CLOSED:
+ reset();
+ }
+ ensure(State.STATE_1_RESET);
+ Frameworks.withPlanner(
+ new Frameworks.PlannerAction<Void>() {
+ public Void apply(RelOptCluster cluster, RelOptSchema relOptSchema,
+ SchemaPlus rootSchema) {
+ Util.discard(rootSchema); // use our own defaultSchema
+ typeFactory = (JavaTypeFactory) cluster.getTypeFactory();
+ planner = cluster.getPlanner();
+ planner.setExecutor(executor);
+ return null;
+ }
+ },
+ config);
+
+ state = State.STATE_2_READY;
+
+ // If user specify own traitDef, instead of default default trait,
+ // first, clear the default trait def registered with planner
+ // then, register the trait def specified in traitDefs.
+ if (this.traitDefs != null) {
+ planner.clearRelTraitDefs();
+ for (RelTraitDef def : this.traitDefs) {
+ planner.addRelTraitDef(def);
+ }
+ }
+ }
+
+ private static SchemaPlus rootSchema(SchemaPlus schema) {
+ for (; ; ) {
+ if (schema.getParentSchema() == null) {
+ return schema;
+ }
+ schema = schema.getParentSchema();
+ }
+ }
+
+ private CalciteCatalogReader createCatalogReader() {
+ SchemaPlus rootSchema = rootSchema(defaultSchema);
+ return new CalciteCatalogReader(
+ CalciteSchema.from(rootSchema),
+ parserConfig.caseSensitive(),
+ CalciteSchema.from(defaultSchema).path(null),
+ typeFactory);
+ }
+
+ private RexBuilder createRexBuilder() {
+ return new RexBuilder(typeFactory);
+ }
+
+ private SqlConformance conformance() {
+ final Context context = config.getContext();
+ if (context != null) {
+ final CalciteConnectionConfig connectionConfig =
+ context.unwrap(CalciteConnectionConfig.class);
+ if (connectionConfig != null) {
+ return connectionConfig.conformance();
+ }
+ }
+ return SqlConformanceEnum.DEFAULT;
+ }
+
+ /**
+ * Implements {@link org.apache.calcite.plan.RelOptTable.ViewExpander}
+ * interface for {@link org.apache.calcite.tools.Planner}.
+ */
+ public class ViewExpanderImpl implements ViewExpander {
+ @Override
+ public RelRoot expandView(RelDataType rowType, String queryString,
+ List<String> schemaPath, List<String> viewPath) {
+ SqlParser parser = SqlParser.create(queryString, parserConfig);
+ SqlNode sqlNode;
+ try {
+ sqlNode = parser.parseQuery();
+ } catch (SqlParseException e) {
+ throw new RuntimeException("parse failed", e);
+ }
+
+ final SqlConformance conformance = conformance();
+ final CalciteCatalogReader catalogReader =
+ createCatalogReader().withSchemaPath(schemaPath);
+ final SqlValidator validator =
+ new CalciteSqlValidator(operatorTable, catalogReader, typeFactory,
+ conformance);
+ validator.setIdentifierExpansion(true);
+ final SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+ final RexBuilder rexBuilder = createRexBuilder();
+ final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+ final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
+ .withTrimUnusedFields(false).withConvertTableAccess(false).build();
+ final SqlToRelConverter sqlToRelConverter =
+ new SqlToRelConverter(new ViewExpanderImpl(), validator,
+ catalogReader, cluster, convertletTable, config);
+
+ root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false);
+ root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel));
+
+ return CalciteTest.this.root;
+ }
+ }
+
+ private enum State {
+ STATE_0_CLOSED {
+ @Override
+ void from(CalciteTest planner) {
+ planner.close();
+ }
+ },
+ STATE_1_RESET {
+ @Override
+ void from(CalciteTest planner) {
+ planner.ensure(STATE_0_CLOSED);
+ planner.reset();
+ }
+ },
+ STATE_2_READY {
+ @Override
+ void from(CalciteTest planner) {
+ STATE_1_RESET.from(planner);
+ planner.ready();
+ }
+ },
+ STATE_3_PARSED,
+ STATE_4_VALIDATED,
+ STATE_5_CONVERTED;
+
+ /**
+ * Moves planner's state to this state. This must be a higher state.
+ */
+ void from(CalciteTest planner) {
+ throw new IllegalArgumentException("cannot move from " + planner.state
+ + " to " + this);
+ }
+ }
+
+ void calTest() throws SqlParseException {
+
+// String sql = "select t.orders.id from t.orders";
+//
+// String sql = "select t.products.id "
+// + "from t.orders, t.products "
+// + "where t.orders.id = t.products.id and quantity>2 ";
+
+ String sql = "SELECT t.products.id AS product_id, t.products.name "
+ + "AS product_name, t.orders.id AS order_id "
+ + "FROM t.products JOIN t.orders ON t.products.id = t.orders.id WHERE quantity > 2";
+
+ final SqlParser.Config parserConfig = SqlParser.configBuilder().setLex(Lex.MYSQL).build();
+
+ // Parse the query
+ SqlParser parser = SqlParser.create(sql, parserConfig);
+ SqlNode sqlNode = parser.parseStmt();
+
+ // Validate the query
+ CalciteCatalogReader catalogReader = createCatalogReader();
+ SqlValidator validator = SqlValidatorUtil.newValidator(
+ SqlStdOperatorTable.instance(), catalogReader, typeFactory, SqlConformance.DEFAULT);
+ SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+ // Convert SqlNode to RelNode
+ RexBuilder rexBuilder = createRexBuilder();
+ RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
+ SqlToRelConverter sqlToRelConverter = new SqlToRelConverter(
+ new ViewExpanderImpl(),
+ validator,
+ createCatalogReader(),
+ cluster,
+ convertletTable);
+ RelRoot root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
+ System.out.println(RelOptUtil.toString(root.rel));
+
+ // Optimize the plan
+ RelOptPlanner planner = new VolcanoPlanner();
+
+ // Create a set of rules to apply
+ Program program = Programs.ofRules(
+// FilterProjectTransposeRule.INSTANCE,
+// ProjectMergeRule.INSTANCE,
+// FilterMergeRule.INSTANCE,
+// FilterJoinRule.JOIN,
+ GearFilterRule.INSTANCE,
+ LoptOptimizeJoinRule.INSTANCE);
+
+ RelTraitSet traitSet = planner.emptyTraitSet().replace(EnumerableConvention.INSTANCE);
+
+ // Execute the program
+// RelNode optimized = program.run(planner, root.rel, traitSet,
+// ImmutableList.<RelOptMaterialization>of(), ImmutableList.<RelOptLattice>of());
+// logger.info(RelOptUtil.toString(optimized));
+
+ }
+
+ // new test -------------------------
+ private Planner getPlanner(List<RelTraitDef> traitDefs, Program... programs) {
+ try {
+ return getPlanner(traitDefs, SqlParser.Config.DEFAULT, programs);
+ } catch (ClassNotFoundException e) {
+ LOG.error(e.getMessage());
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ }
+ return null;
+ }
+
+ private Planner getPlanner(List<RelTraitDef> traitDefs,
+ SqlParser.Config parserConfig,
+ Program... programs) throws ClassNotFoundException, SQLException {
+
+ Class.forName("org.apache.calcite.jdbc.Driver");
+ java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+ CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ SchemaPlus rootSchema = calciteConnection.getRootSchema();
+ rootSchema.add("T", new ReflectiveSchema(new SampleTransactions.Transactions()));
+
+ final FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(parserConfig)
+ .defaultSchema(rootSchema)
+ .traitDefs(traitDefs)
+ .programs(programs)
+ .build();
+ return Frameworks.getPlanner(config);
+ }
+
+ void calTest2() throws SqlParseException, ValidationException, RelConversionException {
+
+ RuleSet ruleSet = RuleSets.ofList(
+ SortRemoveRule.INSTANCE,
+ EnumerableRules.ENUMERABLE_PROJECT_RULE,
+ EnumerableRules.ENUMERABLE_SORT_RULE);
+
+ Planner planner = getPlanner(null, Programs.of(ruleSet));
+
+ String sql = "SELECT * FROM t.products ORDER BY t.products.id";
+
+ SqlNode parse = planner.parse(sql);
+ System.out.println("SQL Parse Tree:- \n" + parse.toString());
+
+ SqlNode validate = planner.validate(parse);
+ RelNode convert = planner.rel(validate).project();
+ RelTraitSet traitSet = convert.getTraitSet().replace(EnumerableConvention.INSTANCE);
+ RelNode transform = planner.transform(0, traitSet, convert);
+ System.out.println("\n\nRelational Expression:- \n" + RelOptUtil.toString(transform));
+
+ }
+
+
+ public static void main(String[] args) throws ClassNotFoundException,
+ SQLException, SqlParseException {
+
+ // calTest()
+// Class.forName("org.apache.calcite.jdbc.Driver");
+// java.sql.Connection connection = DriverManager.getConnection("jdbc:calcite:");
+// CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+// SchemaPlus rootSchema = calciteConnection.getRootSchema();
+// rootSchema.add("t", new ReflectiveSchema(new StreamQueryPlanner.Transactions()));
+//
+// FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
+// CalciteTest ct = new CalciteTest(frameworkConfig);
+// ct.ready();
+// ct.calTest();
+
+ // calTest2()
+ CalciteTest calTest = new CalciteTest();
+ try {
+ calTest.calTest2();
+ } catch (ValidationException e) {
+ LOG.error(e.getMessage());
+ } catch (RelConversionException e) {
+ LOG.error(e.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
new file mode 100644
index 0000000..551beff
--- /dev/null
+++ b/experiments/sql/src/test/java/org/apache/gearpump/sql/planner/QueryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.io.Resources;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+public class QueryTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryTest.class);
+
+ @Test
+ public void testLogicalPlan() {
+
+ try {
+ CalciteConnection connection = new Connection();
+ String salesSchema = Resources.toString(Query.class.getResource("/model.json"),
+ Charset.defaultCharset());
+ new ModelHandler(connection, "inline:" + salesSchema);
+
+ Query queryPlanner = new Query(connection.getRootSchema().getSubSchema(connection.getSchema()));
+ RelNode logicalPlan = queryPlanner.getLogicalPlan("SELECT item FROM transactions");
+
+ System.out.println("Getting Logical Plan...\n" + RelOptUtil.toString(logicalPlan));
+
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ } catch (RelConversionException e) {
+ LOG.error(e.getMessage());
+ } catch (ValidationException e) {
+ LOG.error(e.getMessage());
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/test/resources/model.json
----------------------------------------------------------------------
diff --git a/experiments/sql/src/test/resources/model.json b/experiments/sql/src/test/resources/model.json
index 5dfbf53..bfb62ed 100644
--- a/experiments/sql/src/test/resources/model.json
+++ b/experiments/sql/src/test/resources/model.json
@@ -27,7 +27,7 @@
{
"name": "Transactions",
"type": "custom",
- "factory": "org.apache.calcite.table.TransactionsTableFactory",
+ "factory": "org.apache.gearpump.sql.table.TransactionsTableFactory",
"operand": {
"file": "resources/sales/Transactions.csv",
"flavor": "scannable"
[3/3] incubator-gearpump git commit: [GEARPUMP-217] Add Gearpump rel,
rule and examples.
Posted by ma...@apache.org.
[GEARPUMP-217] Add Gearpump rel, rule and examples.
Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Author: Buddhi Rathnayaka <bu...@cse.mrt.ac.lk>
Closes #217 from buddhiayesha2015/upstream_sql.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/54686e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/54686e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/54686e0e
Branch: refs/heads/sql
Commit: 54686e0e28efcca4901fe06589d2736daad2e606
Parents: e04df0d
Author: Buddhi Ayesha <bu...@cse.mrt.ac.lk>
Authored: Sat Aug 19 08:24:49 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Aug 19 08:25:49 2017 +0800
----------------------------------------------------------------------
.../examples/wordcountjava/WordCountSpec.scala | 13 +-
experiments/sql/README.md | 10 +-
.../main/java/org/apache/calcite/SQLNode.java | 112 ------
.../org/apache/calcite/planner/Connection.java | 294 --------------
.../org/apache/calcite/planner/LogicalPlan.java | 43 --
.../java/org/apache/calcite/planner/Query.java | 79 ----
.../calcite/planner/StreamQueryPlanner.java | 91 -----
.../calcite/table/TransactionsTableFactory.java | 88 ----
.../utils/CalciteFrameworkConfiguration.java | 58 ---
.../calcite/validator/CalciteSqlValidator.java | 50 ---
.../java/org/apache/gearpump/sql/SQLNode.java | 127 ++++++
.../apache/gearpump/sql/planner/Connection.java | 292 ++++++++++++++
.../sql/planner/GearRelDataTypeSystem.java | 41 ++
.../gearpump/sql/planner/GearRuleSets.java | 60 +++
.../gearpump/sql/planner/LogicalPlan.java | 43 ++
.../org/apache/gearpump/sql/planner/Query.java | 80 ++++
.../sql/planner/StreamQueryPlanner.java | 96 +++++
.../gearpump/sql/rel/GearAggregationRel.java | 120 ++++++
.../apache/gearpump/sql/rel/GearFilterRel.java | 47 +++
.../apache/gearpump/sql/rel/GearFlatMapRel.java | 112 ++++++
.../apache/gearpump/sql/rel/GearIOSinkRel.java | 52 +++
.../gearpump/sql/rel/GearIOSourceRel.java | 39 ++
.../gearpump/sql/rel/GearIntersectRel.java | 54 +++
.../apache/gearpump/sql/rel/GearJoinRel.java | 94 +++++
.../gearpump/sql/rel/GearLogicalConvention.java | 65 +++
.../apache/gearpump/sql/rel/GearMinusRel.java | 51 +++
.../apache/gearpump/sql/rel/GearProjectRel.java | 50 +++
.../apache/gearpump/sql/rel/GearRelNode.java | 30 ++
.../sql/rel/GearSetOperatorRelBase.java | 47 +++
.../apache/gearpump/sql/rel/GearSortRel.java | 95 +++++
.../gearpump/sql/rel/GearSqlRelUtils.java | 71 ++++
.../apache/gearpump/sql/rel/GearUnionRel.java | 55 +++
.../apache/gearpump/sql/rel/GearValuesRel.java | 42 ++
.../gearpump/sql/rule/GearAggregationRule.java | 147 +++++++
.../gearpump/sql/rule/GearFilterRule.java | 48 +++
.../gearpump/sql/rule/GearFlatMapRule.java | 52 +++
.../gearpump/sql/rule/GearIOSinkRule.java | 79 ++++
.../gearpump/sql/rule/GearIOSourceRule.java | 46 +++
.../gearpump/sql/rule/GearIntersectRule.java | 51 +++
.../apache/gearpump/sql/rule/GearJoinRule.java | 53 +++
.../apache/gearpump/sql/rule/GearMinusRule.java | 51 +++
.../gearpump/sql/rule/GearProjectRule.java | 48 +++
.../apache/gearpump/sql/rule/GearSortRule.java | 51 +++
.../apache/gearpump/sql/rule/GearUnionRule.java | 49 +++
.../gearpump/sql/rule/GearValuesRule.java | 48 +++
.../apache/gearpump/sql/table/SampleString.java | 45 +++
.../gearpump/sql/table/SampleTransactions.java | 60 +++
.../sql/table/TransactionsTableFactory.java | 88 ++++
.../utils/CalciteFrameworkConfiguration.java | 58 +++
.../gearpump/sql/utils/GearConfiguration.java | 49 +++
.../sql/validator/CalciteSqlValidator.java | 50 +++
.../gearpump/experiments/sql/Connection.scala | 3 -
.../org/apache/calcite/planner/CalciteTest.java | 323 ---------------
.../org/apache/calcite/planner/QueryTest.java | 64 ---
.../gearpump/sql/example/SqlWordCountTest.java | 125 ++++++
.../gearpump/sql/planner/CalciteTest.java | 397 +++++++++++++++++++
.../apache/gearpump/sql/planner/QueryTest.java | 65 +++
experiments/sql/src/test/resources/model.json | 2 +-
58 files changed, 3438 insertions(+), 1215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
index 3736c86..7df8651 100644
--- a/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
+++ b/examples/streaming/wordcount-java/src/test/scala/org/apache/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala
@@ -18,16 +18,15 @@
package org.apache.gearpump.streaming.examples.wordcountjava
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.streaming.examples.wordcountjava.WordCount
+import org.apache.gearpump.streaming.examples.wordcountjava.dsl.WordCount
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import scala.concurrent.Future
+import scala.util.Success
class WordCountSpec
extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/README.md
----------------------------------------------------------------------
diff --git a/experiments/sql/README.md b/experiments/sql/README.md
index 6e9d490..9880dff 100644
--- a/experiments/sql/README.md
+++ b/experiments/sql/README.md
@@ -1,2 +1,8 @@
-SQL Support
-===========
\ No newline at end of file
+# SQL Support
+This project is about building a SQL layer with Apache Calcite to help those who are unfamiliar with Scala/Java to use Gearpump.
+
+## Build
+- Build [GearPump SQL](https://github.com/buddhiayesha2015/incubator-gearpump/tree/sql/experiments/sql)
+
+## Test
+- Run [SQL WordCount example](https://github.com/buddhiayesha2015/incubator-gearpump/blob/sql/experiments/sql/src/test/java/org/apache/gearpump/sql/example/SqlWordCountTest.java)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java b/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
deleted file mode 100644
index c092bf0..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/SQLNode.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.calcite;
-
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.fun.SqlCase;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.parser.impl.SqlParserImpl;
-import org.apache.calcite.sql.validate.SqlConformanceEnum;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql.validate.SqlValidatorScope;
-import org.apache.calcite.util.Util;
-import org.junit.ComparisonFailure;
-
-import java.util.regex.Pattern;
-
-/**
- * Created by Buddhi on 6/8/2017.
- */
-public class SQLNode {
-
- private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n");
-
- private static final Pattern TAB_PATTERN = Pattern.compile("\t");
-
- private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \"";
-
- private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() {
- @Override
- protected boolean[] initialValue() {
- return new boolean[]{true};
- }
- };
-
-
- protected SqlParser getSqlParser(String sql) {
- return SqlParser.create(sql,
- SqlParser.configBuilder()
- .setParserFactory(SqlParserImpl.FACTORY)
- .setQuoting(Quoting.DOUBLE_QUOTE)
- .setUnquotedCasing(Casing.TO_UPPER)
- .setQuotedCasing(Casing.UNCHANGED)
- .setConformance(SqlConformanceEnum.DEFAULT)
- .build());
- }
-
- public static String toJavaString(String s) {
- s = Util.replace(s, "\"", "\\\"");
- s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK);
- s = TAB_PATTERN.matcher(s).replaceAll("\\\\t");
- s = "\"" + s + "\"";
- String spurious = "\n \\+ \"\"";
- if (s.endsWith(spurious)) {
- s = s.substring(0, s.length() - spurious.length());
- }
- return s;
- }
-
- public static void assertEqualsVerbose(String expected, String actual) {
- if (actual == null) {
- if (expected == null) {
- return;
- } else {
- String message = "Expected:\n" + expected + "\nActual: null";
- throw new ComparisonFailure(message, expected, null);
- }
- }
- if ((expected != null) && expected.equals(actual)) {
- return;
- }
- String s = toJavaString(actual);
- String message = "Expected:\n" + expected + "\nActual:\n" +
- actual + "\nActual java:\n" + s + '\n';
-
- throw new ComparisonFailure(message, expected, actual);
- }
-
- public void check(String sql, String expected) {
- final SqlNode sqlNode;
- try {
- sqlNode = getSqlParser(sql).parseStmt();
- } catch (SqlParseException e) {
- throw new RuntimeException("Error while parsing SQL: " + sql, e);
- }
-
- String actual = sqlNode.toSqlString(null, true).getSql();
- if (LINUXIFY.get()[0]) {
- actual = Util.toLinux(actual);
- }
- assertEqualsVerbose(expected, actual);
- }
-
- public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) {
- SqlCase sqlCase = (SqlCase) call;
- SqlNodeList whenOperands = sqlCase.getWhenOperands();
- SqlNodeList thenOperands = sqlCase.getThenOperands();
- SqlNode elseOperand = sqlCase.getElseOperand();
- for (SqlNode operand : whenOperands) {
- operand.validateExpr(validator, operandScope);
- }
- for (SqlNode operand : thenOperands) {
- operand.validateExpr(validator, operandScope);
- }
- if (elseOperand != null) {
- elseOperand.validateExpr(validator, operandScope);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
deleted file mode 100644
index 1b8648d..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/Connection.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.log4j.Logger;
-
-import java.lang.reflect.Type;
-import java.sql.*;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-public class Connection implements CalciteConnection {
-
- private final static Logger logger = Logger.getLogger(Connection.class);
- private final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
- private String schema = null;
-
- public SchemaPlus getRootSchema() {
- return rootSchema;
- }
-
- public JavaTypeFactory getTypeFactory() {
- return null;
- }
-
- public Properties getProperties() {
- return null;
- }
-
- public Statement createStatement() throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql) throws SQLException {
- return null;
- }
-
- public CallableStatement prepareCall(String sql) throws SQLException {
- return null;
- }
-
- public String nativeSQL(String sql) throws SQLException {
- return null;
- }
-
- public void setAutoCommit(boolean autoCommit) throws SQLException {
-
- }
-
- public boolean getAutoCommit() throws SQLException {
- return false;
- }
-
- public void commit() throws SQLException {
-
- }
-
- public void rollback() throws SQLException {
-
- }
-
- public void close() throws SQLException {
-
- }
-
- public boolean isClosed() throws SQLException {
- return false;
- }
-
- public DatabaseMetaData getMetaData() throws SQLException {
- return null;
- }
-
- public void setReadOnly(boolean readOnly) throws SQLException {
-
- }
-
- public boolean isReadOnly() throws SQLException {
- return false;
- }
-
- public void setCatalog(String catalog) throws SQLException {
-
- }
-
- public String getCatalog() throws SQLException {
- return null;
- }
-
- public void setTransactionIsolation(int level) throws SQLException {
-
- }
-
- public int getTransactionIsolation() throws SQLException {
- return 0;
- }
-
- public SQLWarning getWarnings() throws SQLException {
- return null;
- }
-
- public void clearWarnings() throws SQLException {
-
- }
-
- public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- public Map<String, Class<?>> getTypeMap() throws SQLException {
- return null;
- }
-
- public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
-
- }
-
- public void setHoldability(int holdability) throws SQLException {
-
- }
-
- public int getHoldability() throws SQLException {
- return 0;
- }
-
- public Savepoint setSavepoint() throws SQLException {
- return null;
- }
-
- public Savepoint setSavepoint(String name) throws SQLException {
- return null;
- }
-
- public void rollback(Savepoint savepoint) throws SQLException {
-
- }
-
- public void releaseSavepoint(Savepoint savepoint) throws SQLException {
-
- }
-
- public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return null;
- }
-
- public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return null;
- }
-
- public Clob createClob() throws SQLException {
- return null;
- }
-
- public Blob createBlob() throws SQLException {
- return null;
- }
-
- public NClob createNClob() throws SQLException {
- return null;
- }
-
- public SQLXML createSQLXML() throws SQLException {
- return null;
- }
-
- public boolean isValid(int timeout) throws SQLException {
- return false;
- }
-
- public void setClientInfo(String name, String value) throws SQLClientInfoException {
-
- }
-
- public void setClientInfo(Properties properties) throws SQLClientInfoException {
-
- }
-
- public String getClientInfo(String name) throws SQLException {
- return null;
- }
-
- public Properties getClientInfo() throws SQLException {
- return null;
- }
-
- public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- return null;
- }
-
- public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- return null;
- }
-
- public void setSchema(String s) throws SQLException {
- schema = s;
- }
-
- public String getSchema() throws SQLException {
- return schema;
- }
-
- public void abort(Executor executor) throws SQLException {
-
- }
-
- public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
-
- }
-
- public int getNetworkTimeout() throws SQLException {
- return 0;
- }
-
- public CalciteConnectionConfig config() {
- return null;
- }
-
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return null;
- }
-
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return false;
- }
-
- public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
- return null;
- }
-
- public <T> Queryable<T> createQuery(Expression expression, Type type) {
- return null;
- }
-
- public <T> T execute(Expression expression, Class<T> aClass) {
- return null;
- }
-
- public <T> T execute(Expression expression, Type type) {
- return null;
- }
-
- public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
deleted file mode 100644
index 516ebe1..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/LogicalPlan.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-
-public class LogicalPlan {
-
- public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException,
- RelConversionException {
- SqlNode sqlNode;
-
- try {
- sqlNode = planner.parse(query);
- } catch (SqlParseException e) {
- throw new RuntimeException("SQL query parsing error", e);
- }
- SqlNode validatedSqlNode = planner.validate(sqlNode);
-
- return planner.rel(validatedSqlNode).project();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java b/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
deleted file mode 100644
index 9008a16..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/Query.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.*;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This Class is intended to test functions of Apache Calcite
- */
-public class Query {
-
- private final static Logger logger = Logger.getLogger(Query.class);
- private final Planner queryPlanner;
-
- public Query(SchemaPlus schema) {
-
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder()
- .setLex(Lex.MYSQL)
- .build())
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .ruleSets(RuleSets.ofList())
- .costFactory(null)
- .typeSystem(RelDataTypeSystem.DEFAULT)
- .build();
- this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig);
- }
-
- public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException {
- SqlNode sqlNode = null;
- try {
- sqlNode = queryPlanner.parse(query);
- } catch (SqlParseException e) {
- logger.error("SQL Parse Exception", e);
- }
-
- SqlNode validatedSqlNode = queryPlanner.validate(sqlNode);
- return queryPlanner.rel(validatedSqlNode).project();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
deleted file mode 100644
index 0603827..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/planner/StreamQueryPlanner.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.planner;
-
-import org.apache.calcite.adapter.java.ReflectiveSchema;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.*;
-import org.apache.calcite.utils.CalciteFrameworkConfiguration;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public class StreamQueryPlanner {
-
- public static void main(String[] args) throws ClassNotFoundException, SQLException, ValidationException, RelConversionException {
-
- Class.forName("org.apache.calcite.jdbc.Driver");
- Connection connection = DriverManager.getConnection("jdbc:calcite:");
- CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
- SchemaPlus rootSchema = calciteConnection.getRootSchema();
- rootSchema.add("t", new ReflectiveSchema(new Transactions()));
-
- FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
- Planner planner = Frameworks.getPlanner(frameworkConfig);
-
- String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products "
- + "where t.orders.id = t.products.id group by t.orders.id, name "
- + "having sum(quantity) > 5 order by sum(quantity) ";
-
- RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner);
- System.out.println(RelOptUtil.toString(logicalPlan));
- }
-
- public static class Transactions {
-
- public final Order[] orders = {
- new Order("001", 3),
- new Order("002", 5),
- new Order("003", 8),
- new Order("004", 15),
- };
-
- public final Product[] products = {
- new Product("001", "Book"),
- new Product("002", "Pen"),
- new Product("003", "Pencil"),
- new Product("004", "Ruler"),
- };
- }
-
- public static class Order {
- public final String id;
- public final int quantity;
-
- public Order(String id, int quantity) {
- this.id = id;
- this.quantity = quantity;
- }
- }
-
- public static class Product {
- public final String id;
- public final String name;
-
- public Product(String id, String name) {
- this.id = id;
- this.name = name;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
deleted file mode 100644
index 42c2f92..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/table/TransactionsTableFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.table;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.Map;
-
-public class TransactionsTableFactory implements TableFactory<Table> {
-
- @Override
- public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) {
- final Object[][] rows = {
- {100, "I001", "item1", 3},
- {101, "I002", "item2", 5},
- {102, "I003", "item3", 8},
- {103, "I004", "item4", 33},
- {104, "I005", "item5", 23}
- };
-
- return new TransactionsTable(ImmutableList.copyOf(rows));
- }
-
- public static class TransactionsTable implements ScannableTable {
-
- protected final RelProtoDataType protoRowType = new RelProtoDataType() {
- public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("timeStamp", SqlTypeName.TIMESTAMP)
- .add("id", SqlTypeName.VARCHAR, 10)
- .add("item", SqlTypeName.VARCHAR, 50)
- .add("quantity", SqlTypeName.INTEGER)
- .build();
- }
- };
-
- private final ImmutableList<Object[]> rows;
-
- public TransactionsTable(ImmutableList<Object[]> rows) {
- this.rows = rows;
- }
-
- public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(rows);
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return protoRowType.apply(typeFactory);
- }
-
- @Override
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
- }
-
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
deleted file mode 100644
index e6be58a..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/utils/CalciteFrameworkConfiguration.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.utils;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RuleSets;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CalciteFrameworkConfiguration {
-
- public static FrameworkConfig getDefaultconfig(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder()
- .setLex(Lex.JAVA)
- .build())
- .defaultSchema(schema)
- .traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .ruleSets(RuleSets.ofList())
- .costFactory(null)
- .typeSystem(RelDataTypeSystem.DEFAULT)
- .build();
-
- return frameworkConfiguration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
deleted file mode 100644
index fb132e2..0000000
--- a/experiments/sql/src/main/java/org/apache/calcite/validator/CalciteSqlValidator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.calcite.validator;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.sql.validate.SqlValidatorImpl;
-
-public class CalciteSqlValidator extends SqlValidatorImpl {
- public CalciteSqlValidator(SqlOperatorTable opTab,
- CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory,
- SqlConformance conformance) {
- super(opTab, catalogReader, typeFactory, conformance);
- }
-
- @Override
- protected RelDataType getLogicalSourceRowType(
- RelDataType sourceRowType, SqlInsert insert) {
- final RelDataType superType =
- super.getLogicalSourceRowType(sourceRowType, insert);
- return ((JavaTypeFactory) typeFactory).toSql(superType);
- }
-
- @Override
- protected RelDataType getLogicalTargetRowType(
- RelDataType targetRowType, SqlInsert insert) {
- final RelDataType superType =
- super.getLogicalTargetRowType(targetRowType, insert);
- return ((JavaTypeFactory) typeFactory).toSql(superType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
new file mode 100644
index 0000000..7fcc3a1
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/SQLNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.impl.SqlParserImpl;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.util.Util;
+import org.junit.ComparisonFailure;
+
+import java.util.regex.Pattern;
+
+public class SQLNode {
+
+ private static final Pattern LINE_BREAK_PATTERN = Pattern.compile("\r\n|\r|\n");
+
+ private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+ private static final String LINE_BREAK = "\\\\n\"" + Util.LINE_SEPARATOR + " + \"";
+
+ private static final ThreadLocal<boolean[]> LINUXIFY = new ThreadLocal<boolean[]>() {
+ @Override
+ protected boolean[] initialValue() {
+ return new boolean[]{true};
+ }
+ };
+
+
+ protected SqlParser getSqlParser(String sql) {
+ return SqlParser.create(sql,
+ SqlParser.configBuilder()
+ .setParserFactory(SqlParserImpl.FACTORY)
+ .setQuoting(Quoting.DOUBLE_QUOTE)
+ .setUnquotedCasing(Casing.TO_UPPER)
+ .setQuotedCasing(Casing.UNCHANGED)
+ .setConformance(SqlConformanceEnum.DEFAULT)
+ .build());
+ }
+
+ public static String toJavaString(String s) {
+ s = Util.replace(s, "\"", "\\\"");
+ s = LINE_BREAK_PATTERN.matcher(s).replaceAll(LINE_BREAK);
+ s = TAB_PATTERN.matcher(s).replaceAll("\\\\t");
+ s = "\"" + s + "\"";
+ String spurious = "\n \\+ \"\"";
+ if (s.endsWith(spurious)) {
+ s = s.substring(0, s.length() - spurious.length());
+ }
+ return s;
+ }
+
+ public static void assertEqualsVerbose(String expected, String actual) {
+ if (actual == null) {
+ if (expected == null) {
+ return;
+ } else {
+ String message = "Expected:\n" + expected + "\nActual: null";
+ throw new ComparisonFailure(message, expected, null);
+ }
+ }
+ if ((expected != null) && expected.equals(actual)) {
+ return;
+ }
+ String s = toJavaString(actual);
+ String message = "Expected:\n" + expected + "\nActual:\n" +
+ actual + "\nActual java:\n" + s + '\n';
+
+ throw new ComparisonFailure(message, expected, actual);
+ }
+
+ public void check(String sql, String expected) {
+ final SqlNode sqlNode;
+ try {
+ sqlNode = getSqlParser(sql).parseStmt();
+ } catch (SqlParseException e) {
+ throw new RuntimeException("Error while parsing SQL: " + sql, e);
+ }
+
+ String actual = sqlNode.toSqlString(null, true).getSql();
+ if (LINUXIFY.get()[0]) {
+ actual = Util.toLinux(actual);
+ }
+ assertEqualsVerbose(expected, actual);
+ }
+
+ public void validateCall(SqlCall call, SqlValidator validator, SqlValidatorScope operandScope) {
+ SqlCase sqlCase = (SqlCase) call;
+ SqlNodeList whenOperands = sqlCase.getWhenOperands();
+ SqlNodeList thenOperands = sqlCase.getThenOperands();
+ SqlNode elseOperand = sqlCase.getElseOperand();
+ for (SqlNode operand : whenOperands) {
+ operand.validateExpr(validator, operandScope);
+ }
+ for (SqlNode operand : thenOperands) {
+ operand.validateExpr(validator, operandScope);
+ }
+ if (elseOperand != null) {
+ elseOperand.validateExpr(validator, operandScope);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
new file mode 100644
index 0000000..e5954f0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Connection.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class Connection implements CalciteConnection {
+
+ private final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
+ private String schema = null;
+
+ public SchemaPlus getRootSchema() {
+ return rootSchema;
+ }
+
+ public JavaTypeFactory getTypeFactory() {
+ return null;
+ }
+
+ public Properties getProperties() {
+ return null;
+ }
+
+ public Statement createStatement() throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return null;
+ }
+
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return null;
+ }
+
+ public String nativeSQL(String sql) throws SQLException {
+ return null;
+ }
+
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+ }
+
+ public boolean getAutoCommit() throws SQLException {
+ return false;
+ }
+
+ public void commit() throws SQLException {
+
+ }
+
+ public void rollback() throws SQLException {
+
+ }
+
+ public void close() throws SQLException {
+
+ }
+
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ public void setReadOnly(boolean readOnly) throws SQLException {
+
+ }
+
+ public boolean isReadOnly() throws SQLException {
+ return false;
+ }
+
+ public void setCatalog(String catalog) throws SQLException {
+
+ }
+
+ public String getCatalog() throws SQLException {
+ return null;
+ }
+
+ public void setTransactionIsolation(int level) throws SQLException {
+
+ }
+
+ public int getTransactionIsolation() throws SQLException {
+ return 0;
+ }
+
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ public void clearWarnings() throws SQLException {
+
+ }
+
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return null;
+ }
+
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+ }
+
+ public void setHoldability(int holdability) throws SQLException {
+
+ }
+
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ public Savepoint setSavepoint() throws SQLException {
+ return null;
+ }
+
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return null;
+ }
+
+ public void rollback(Savepoint savepoint) throws SQLException {
+
+ }
+
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+ }
+
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return null;
+ }
+
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return null;
+ }
+
+ public Clob createClob() throws SQLException {
+ return null;
+ }
+
+ public Blob createBlob() throws SQLException {
+ return null;
+ }
+
+ public NClob createNClob() throws SQLException {
+ return null;
+ }
+
+ public SQLXML createSQLXML() throws SQLException {
+ return null;
+ }
+
+ public boolean isValid(int timeout) throws SQLException {
+ return false;
+ }
+
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+ }
+
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+ }
+
+ public String getClientInfo(String name) throws SQLException {
+ return null;
+ }
+
+ public Properties getClientInfo() throws SQLException {
+ return null;
+ }
+
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return null;
+ }
+
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return null;
+ }
+
+ public void setSchema(String s) throws SQLException {
+ schema = s;
+ }
+
+ public String getSchema() throws SQLException {
+ return schema;
+ }
+
+ public void abort(Executor executor) throws SQLException {
+
+ }
+
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+ }
+
+ public int getNetworkTimeout() throws SQLException {
+ return 0;
+ }
+
+ public CalciteConnectionConfig config() {
+ return null;
+ }
+
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+
+ public <T> Queryable<T> createQuery(Expression expression, Class<T> aClass) {
+ return null;
+ }
+
+ public <T> Queryable<T> createQuery(Expression expression, Type type) {
+ return null;
+ }
+
+ public <T> T execute(Expression expression, Class<T> aClass) {
+ return null;
+ }
+
+ public <T> T execute(Expression expression, Type type) {
+ return null;
+ }
+
+ public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
new file mode 100644
index 0000000..a640e12
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRelDataTypeSystem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data types.
+ */
+public class GearRelDataTypeSystem extends RelDataTypeSystemImpl {
+
+ public static final RelDataTypeSystem GEAR_REL_DATATYPE_SYSTEM = new GearRelDataTypeSystem();
+
+ @Override
+ public int getMaxNumericScale() {
+ return 38;
+ }
+
+ @Override
+ public int getMaxNumericPrecision() {
+ return 38;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
new file mode 100644
index 0000000..a962ff1
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/GearRuleSets.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.gearpump.sql.rule.*;
+
+import java.util.Iterator;
+
+public class GearRuleSets {
+ private static final ImmutableSet<RelOptRule> calciteToGearConversionRules = ImmutableSet
+ .<RelOptRule>builder().add(GearIOSourceRule.INSTANCE, GearProjectRule.INSTANCE,
+ GearFilterRule.INSTANCE, GearIOSinkRule.INSTANCE,
+ GearAggregationRule.INSTANCE, GearSortRule.INSTANCE, GearValuesRule.INSTANCE,
+ GearIntersectRule.INSTANCE, GearMinusRule.INSTANCE, GearUnionRule.INSTANCE,
+ GearJoinRule.INSTANCE)
+ .build();
+
+ public static RuleSet[] getRuleSets() {
+ return new RuleSet[]{new GearRuleSet(
+ ImmutableSet.<RelOptRule>builder().addAll(calciteToGearConversionRules).build())};
+ }
+
+ private static class GearRuleSet implements RuleSet {
+ final ImmutableSet<RelOptRule> rules;
+
+ public GearRuleSet(ImmutableSet<RelOptRule> rules) {
+ this.rules = rules;
+ }
+
+ public GearRuleSet(ImmutableList<RelOptRule> rules) {
+ this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
new file mode 100644
index 0000000..1448a71
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/LogicalPlan.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+public class LogicalPlan {
+
+ public static RelNode getLogicalPlan(String query, Planner planner) throws ValidationException,
+ RelConversionException {
+ SqlNode sqlNode;
+
+ try {
+ sqlNode = planner.parse(query);
+ } catch (SqlParseException e) {
+ throw new RuntimeException("SQL query parsing error", e);
+ }
+ SqlNode validatedSqlNode = planner.validate(sqlNode);
+
+ return planner.rel(validatedSqlNode).project();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
new file mode 100644
index 0000000..c18b8b5
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/Query.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This Class is intended to test functions of Apache Calcite
+ */
+public class Query {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Query.class);
+ private final Planner queryPlanner;
+
+ public Query(SchemaPlus schema) {
+
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ FrameworkConfig calciteFrameworkConfig = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.MYSQL)
+ .build())
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(RuleSets.ofList())
+ .costFactory(null)
+ .typeSystem(RelDataTypeSystem.DEFAULT)
+ .build();
+ this.queryPlanner = Frameworks.getPlanner(calciteFrameworkConfig);
+ }
+
+ public RelNode getLogicalPlan(String query) throws ValidationException, RelConversionException {
+ SqlNode sqlNode = null;
+ try {
+ sqlNode = queryPlanner.parse(query);
+ } catch (SqlParseException e) {
+ LOG.error(e.getMessage());
+ }
+
+ SqlNode validatedSqlNode = queryPlanner.validate(sqlNode);
+ return queryPlanner.rel(validatedSqlNode).project();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
new file mode 100644
index 0000000..10f9cbd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/planner/StreamQueryPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.planner;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.*;
+import org.apache.gearpump.sql.utils.CalciteFrameworkConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class StreamQueryPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamQueryPlanner.class);
+
+ public static void main(String[] args) throws ClassNotFoundException,
+ SQLException, ValidationException, RelConversionException {
+
+ Class.forName("org.apache.calcite.jdbc.Driver");
+ Connection connection = DriverManager.getConnection("jdbc:calcite:");
+ CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+ SchemaPlus rootSchema = calciteConnection.getRootSchema();
+ rootSchema.add("t", new ReflectiveSchema(new Transactions()));
+
+ FrameworkConfig frameworkConfig = CalciteFrameworkConfiguration.getDefaultconfig(rootSchema);
+ Planner planner = Frameworks.getPlanner(frameworkConfig);
+
+ String query = "select t.orders.id, name, max(quantity)*0.5 from t.orders, t.products "
+ + "where t.orders.id = t.products.id group by t.orders.id, name "
+ + "having sum(quantity) > 5 order by sum(quantity) ";
+
+ RelNode logicalPlan = LogicalPlan.getLogicalPlan(query, planner);
+ LOG.info("Relational Expression:- \n\n" + RelOptUtil.toString(logicalPlan));
+
+ }
+
+ public static class Transactions {
+
+ public final Order[] orders = {
+ new Order("001", 3),
+ new Order("002", 5),
+ new Order("003", 8),
+ new Order("004", 15),
+ };
+
+ public final Product[] products = {
+ new Product("001", "Book"),
+ new Product("002", "Pen"),
+ new Product("003", "Pencil"),
+ new Product("004", "Ruler"),
+ };
+ }
+
+ public static class Order {
+ public final String id;
+ public final int quantity;
+
+ public Order(String id, int quantity) {
+ this.id = id;
+ this.quantity = quantity;
+ }
+ }
+
+ public static class Product {
+ public final String id;
+ public final String name;
+
+ public Product(String id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
new file mode 100644
index 0000000..46dc15b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearAggregationRel.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import org.apache.gearpump.streaming.dsl.window.api.Trigger;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Duration;
+import java.util.List;
+
+public class GearAggregationRel extends Aggregate implements GearRelNode {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRel.class);
+ private int windowFieldIdx = -1;
+ private WindowFunction windowFn;
+ private Trigger trigger;
+ private Duration allowedLatence = Duration.ZERO;
+
+ public GearAggregationRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+ boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator,
+ ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggCalls) {
+ return null;
+ }
+
+ public RelWriter explainTerms(RelWriter pw) {
+ pw.item("group", groupSet)
+ .itemIf("window", windowFn, windowFn != null)
+ .itemIf("trigger", trigger, trigger != null)
+ .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+ .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+ .itemIf("indicator", indicator, indicator)
+ .itemIf("aggs", aggCalls, pw.nest());
+ if (!pw.nest()) {
+ for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+ pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+ }
+ }
+ return pw;
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+ JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ LOG.debug("Adding Map");
+ JavaStream<Tuple2<String, Integer>> ones = SampleString.WORDS.map(new Ones(), "map");
+
+ LOG.debug("Adding GroupBy");
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(),
+ 1, "groupBy");
+// groupedOnes.log();
+ LOG.debug("Adding Reduce");
+ JavaStream<Tuple2<String, Integer>> wordCount = groupedOnes.reduce(new Count(), "reduce");
+ wordCount.log();
+
+ return wordCount;
+ }
+
+ private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+ @Override
+ public Tuple2<String, Integer> map(String s) {
+ return new Tuple2<>(s, 1);
+ }
+ }
+
+ private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+ @Override
+ public String groupBy(Tuple2<String, Integer> tuple) {
+ return tuple._1();
+ }
+ }
+
+ private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ return new Tuple2<>(t1._1(), t1._2() + t2._2());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
new file mode 100644
index 0000000..53a07d9
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFilterRel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearFilterRel extends Filter implements GearRelNode {
+
+ public GearFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+ RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new GearFilterRel(getCluster(), traitSet, input, condition);
+ }
+
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
new file mode 100644
index 0000000..d4c55fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearFlatMapRel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.DefaultMessage;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.sql.table.SampleString;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class GearFlatMapRel extends Filter implements GearRelNode {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRel.class);
+
+ public GearFlatMapRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ public GearFlatMapRel() {
+ super(null, null, null, null);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new GearFlatMapRel(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app,
+ JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ LOG.debug("Adding Source");
+ JavaStream<String> sentence = app.source(new StringSource(SampleString.Stream.getKV()),
+ 1, UserConfig.empty(), "source");
+ LOG.debug("Adding flatMap");
+ SampleString.WORDS = sentence.flatMap(new Split(), "flatMap");
+ return null;
+ }
+
+ private static class StringSource implements DataSource {
+ private final String str;
+ private boolean hasNext = true;
+
+ StringSource(String str) {
+ this.str = str;
+ }
+
+ @Override
+ public void open(TaskContext context, Instant startTime) {
+ }
+
+ @Override
+ public Message read() {
+ Message msg = new DefaultMessage(str, Instant.now());
+ hasNext = false;
+ return msg;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ if (hasNext) {
+ return Instant.now();
+ } else {
+ return Watermark.MAX();
+ }
+ }
+ }
+
+ private static class Split extends FlatMapFunction<String, String> {
+ @Override
+ public Iterator<String> flatMap(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
new file mode 100644
index 0000000..1d61baf
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSinkRel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIOSinkRel extends TableModify implements GearRelNode {
+ public GearIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+ Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+ List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new GearIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+ getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
new file mode 100644
index 0000000..6641c35
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIOSourceRel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearIOSourceRel extends TableScan implements GearRelNode {
+
+ public GearIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ super(cluster, traitSet, table);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
new file mode 100644
index 0000000..6888a26
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearIntersectRel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearIntersectRel extends Intersect implements GearRelNode {
+ private GearSetOperatorRelBase delegate;
+
+ public GearIntersectRel(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new GearSetOperatorRelBase(this,
+ GearSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new GearIntersectRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
[2/3] incubator-gearpump git commit: [GEARPUMP-217] Add Gearpump rel,
rule and examples.
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
new file mode 100644
index 0000000..1b168fb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearJoinRel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class GearJoinRel extends Join implements GearRelNode {
+ public GearJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traits, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override
+ public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+ RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ return new GearJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+ joinType);
+ }
+
+ private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+ // it's a CROSS JOIN because: condition == true
+ if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+ throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+ }
+
+ RexCall call = (RexCall) condition;
+ List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+ if ("AND".equals(call.getOperator().getName())) {
+ List<RexNode> operands = call.getOperands();
+ for (RexNode rexNode : operands) {
+ Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+ pairs.add(pair);
+ }
+ } else if ("=".equals(call.getOperator().getName())) {
+ pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+ } else {
+ throw new UnsupportedOperationException(
+ "Operator " + call.getOperator().getName() + " is not supported in join condition");
+ }
+
+ return pairs;
+ }
+
+ private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+ int leftRowColumnCount) {
+ List<RexNode> operands = oneCondition.getOperands();
+ final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+
+ final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+ final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+ return new Pair<>(leftIndex, rightIndex);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
new file mode 100644
index 0000000..ced38c3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearLogicalConvention.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.*;
+
+public enum GearLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return GearRelNode.class;
+ }
+
+ @Override
+ public String getName() {
+ return "GEAR_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
new file mode 100644
index 0000000..1ca972a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearMinusRel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearMinusRel extends Minus implements GearRelNode {
+
+ private GearSetOperatorRelBase delegate;
+
+ public GearMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.MINUS, inputs, all);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new GearMinusRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
new file mode 100644
index 0000000..f09dc8c
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearProjectRel.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearProjectRel extends Project implements GearRelNode {
+
+ public GearProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+ RelDataType rowType) {
+ return new GearProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
new file mode 100644
index 0000000..042c767
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearRelNode.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public interface GearRelNode extends RelNode {
+
+ JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
new file mode 100644
index 0000000..ee59753
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSetOperatorRelBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.rel.RelNode;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class GearSetOperatorRelBase {
+
+ public enum OpType implements Serializable {
+ UNION,
+ INTERSECT,
+ MINUS
+ }
+
+ private GearRelNode gearRelNode;
+ private List<RelNode> inputs;
+ private boolean all;
+ private OpType opType;
+
+ public GearSetOperatorRelBase(GearRelNode gearRelNode, OpType opType,
+ List<RelNode> inputs, boolean all) {
+ this.gearRelNode = gearRelNode;
+ this.opType = opType;
+ this.inputs = inputs;
+ this.all = all;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
new file mode 100644
index 0000000..f70481b
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSortRel.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GearSortRel extends Sort implements GearRelNode {
+
+ private List<Integer> fieldIndices = new ArrayList<>();
+ private List<Boolean> orientation = new ArrayList<>();
+ private List<Boolean> nullsFirst = new ArrayList<>();
+
+ private int startIndex = 0;
+ private int count;
+
+ public GearSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation,
+ RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+
+ List<RexNode> fieldExps = getChildExps();
+ RelCollationImpl collationImpl = (RelCollationImpl) collation;
+ List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+ for (int i = 0; i < fieldExps.size(); i++) {
+ RexNode fieldExp = fieldExps.get(i);
+ RexInputRef inputRef = (RexInputRef) fieldExp;
+ fieldIndices.add(inputRef.getIndex());
+ orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
+
+ RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
+ if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+ rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
+ }
+ nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
+ }
+
+ if (fetch == null) {
+ throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
+ }
+
+ RexLiteral fetchLiteral = (RexLiteral) fetch;
+ count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+ if (offset != null) {
+ RexLiteral offsetLiteral = (RexLiteral) offset;
+ startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+ }
+ }
+
+ @Override
+ public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
+ RexNode offset, RexNode fetch) {
+ return new GearSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+ return a.compareTo(b);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
new file mode 100644
index 0000000..54a6bbb
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearSqlRelUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class GearSqlRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(GearSqlRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(GearRelNode relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+ + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(GearRelNode relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+ + "_" + classSequence.getAndIncrement();
+ }
+
+ public static GearRelNode getGearRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (GearRelNode) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. "
+ + "Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... "
+ + "SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
new file mode 100644
index 0000000..431368d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearUnionRel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class GearUnionRel extends Union implements GearRelNode {
+
+ private GearSetOperatorRelBase delegate;
+
+ public GearUnionRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ this.delegate = new GearSetOperatorRelBase(this, GearSetOperatorRelBase.OpType.UNION, inputs, all);
+ }
+
+ public GearUnionRel(RelInput input) {
+ super(input);
+ }
+
+ @Override
+ public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new GearUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
new file mode 100644
index 0000000..6bd9403
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rel/GearValuesRel.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import scala.Tuple2;
+
+public class GearValuesRel extends Values implements GearRelNode {
+
+ public GearValuesRel(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples,
+ RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+ }
+
+ @Override
+ public JavaStream<Tuple2<String, Integer>> buildGearPipeline(JavaStreamApp app, JavaStream<Tuple2<String, Integer>> javaStream) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
new file mode 100644
index 0000000..c1b1602
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearAggregationRule.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.gearpump.sql.rel.GearAggregationRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.apache.gearpump.streaming.dsl.window.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+public class GearAggregationRule extends RelOptRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearAggregationRule.class);
+ public static final GearAggregationRule INSTANCE =
+ new GearAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+ public GearAggregationRule(Class<? extends Aggregate> aggregateClass,
+ Class<? extends Project> projectClass,
+ RelBuilderFactory relBuilderFactory) {
+ super(operand(aggregateClass, operand(projectClass, any())), relBuilderFactory, null);
+ }
+
+ public GearAggregationRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ updateWindowTrigger(call, aggregate, project);
+ }
+
+ private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, Project project) {
+ ImmutableBitSet groupByFields = aggregate.getGroupSet();
+ List<RexNode> projectMapping = project.getProjects();
+
+ WindowFunction windowFn = new GlobalWindowFunction();
+ Trigger triggerFn;
+ int windowFieldIdx = -1;
+ Duration allowedLatence = Duration.ZERO;
+
+ for (int groupField : groupByFields.asList()) {
+ RexNode projNode = projectMapping.get(groupField);
+ if (projNode instanceof RexCall) {
+ SqlOperator op = ((RexCall) projNode).op;
+ ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+ String functionName = op.getName();
+ switch (functionName) {
+ case "TUMBLE":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) FixedWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "HOP":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) SlidingWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))), Duration.ofMillis(getWindowParameterAsMillis(parameters.get(2))));
+
+ if (parameters.size() == 4) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "SESSION":
+ windowFieldIdx = groupField;
+ windowFn = (WindowFunction) SessionWindows.apply(Duration.ofMillis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.ofMillis(delayTime.getTimeInMillis()));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ try {
+ GearAggregationRel gearRel = new GearAggregationRel(aggregate.getCluster(),
+ aggregate.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(aggregate.getInput(),
+ aggregate.getInput().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList());
+ gearRel.buildGearPipeline(GearConfiguration.app, null);
+ GearConfiguration.app.submit().waitUntilFinish();
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+
+ }
+
+ private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+ return null;
+ }
+
+ private long getWindowParameterAsMillis(RexNode parameterNode) {
+ if (parameterNode instanceof RexLiteral) {
+ return RexLiteral.intValue(parameterNode);
+ } else {
+ throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
new file mode 100644
index 0000000..4817ee8
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFilterRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.gearpump.sql.rel.GearFilterRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearFilterRule extends ConverterRule {
+
+ public static final GearFilterRule INSTANCE = new GearFilterRule();
+
+ private GearFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, GearLogicalConvention.INSTANCE, "GearFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ GearFilterRel gearRel = new GearFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ return gearRel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
new file mode 100644
index 0000000..e81f948
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearFlatMapRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearFlatMapRel;
+import org.apache.gearpump.sql.utils.GearConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GearFlatMapRule extends ConverterRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GearFlatMapRule.class);
+ public static final GearFlatMapRule INSTANCE = new GearFlatMapRule(Aggregate.class, Convention.NONE);
+
+ public GearFlatMapRule(Class<? extends Aggregate> aggregateClass, RelTrait projectIn) {
+ super(aggregateClass, projectIn, GearLogicalConvention.INSTANCE, "GearFlatMapRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ try {
+ GearFlatMapRel flatRel = new GearFlatMapRel();
+ flatRel.buildGearPipeline(GearConfiguration.app, null);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
new file mode 100644
index 0000000..ca525d6
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSinkRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.apache.gearpump.sql.rel.GearIOSinkRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSinkRule extends ConverterRule {
+
+ public static final GearIOSinkRule INSTANCE = new GearIOSinkRule();
+
+ private GearIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(GearLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(GearLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new GearIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
new file mode 100644
index 0000000..a725b1a
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIOSourceRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.gearpump.sql.rel.GearIOSourceRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearIOSourceRule extends ConverterRule {
+
+ public static final GearIOSourceRule INSTANCE = new GearIOSourceRule();
+
+ private GearIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new GearIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(GearLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
new file mode 100644
index 0000000..eb149f3
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.gearpump.sql.rel.GearIntersectRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+import java.util.List;
+
+public class GearIntersectRule extends ConverterRule {
+
+ public static final GearIntersectRule INSTANCE = new GearIntersectRule();
+
+ private GearIntersectRule() {
+ super(LogicalIntersect.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearIntersectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Intersect intersect = (Intersect) rel;
+ final List<RelNode> inputs = intersect.getInputs();
+ return new GearIntersectRel(
+ intersect.getCluster(),
+ intersect.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(inputs, GearLogicalConvention.INSTANCE),
+ intersect.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
new file mode 100644
index 0000000..e86db06
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.gearpump.sql.rel.GearJoinRel;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+
+public class GearJoinRule extends ConverterRule {
+
+ public static final GearJoinRule INSTANCE = new GearJoinRule();
+
+ private GearJoinRule() {
+ super(LogicalJoin.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearJoinRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ return new GearJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
new file mode 100644
index 0000000..103a29d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearMinusRel;
+
+import java.util.List;
+
+public class GearMinusRule extends ConverterRule {
+
+ public static final GearMinusRule INSTANCE = new GearMinusRule();
+
+ private GearMinusRule() {
+ super(LogicalMinus.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearMinusRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Minus minus = (Minus) rel;
+ final List<RelNode> inputs = minus.getInputs();
+ return new GearMinusRel(
+ minus.getCluster(),
+ minus.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(inputs, GearLogicalConvention.INSTANCE),
+ minus.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
new file mode 100644
index 0000000..6b09550
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearProjectRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearProjectRel;
+
+public class GearProjectRule extends ConverterRule {
+
+ public static final GearProjectRule INSTANCE = new GearProjectRule();
+
+ private GearProjectRule() {
+ super(LogicalProject.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new GearProjectRel(project.getCluster(),
+ project.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
new file mode 100644
index 0000000..0a0d9e4
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearSortRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearSortRel;
+
+public class GearSortRule extends ConverterRule {
+
+ public static final GearSortRule INSTANCE = new GearSortRule();
+
+ private GearSortRule() {
+ super(LogicalSort.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearSortRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ final RelNode input = sort.getInput();
+ return new GearSortRel(
+ sort.getCluster(),
+ sort.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(GearLogicalConvention.INSTANCE)),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
new file mode 100644
index 0000000..7a17a46
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearUnionRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearUnionRel;
+
+public class GearUnionRule extends ConverterRule {
+
+ public static final GearUnionRule INSTANCE = new GearUnionRule();
+
+ private GearUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, GearLogicalConvention.INSTANCE,
+ "GearUnionRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+
+ return new GearUnionRel(
+ union.getCluster(),
+ union.getTraitSet().replace(GearLogicalConvention.INSTANCE),
+ convertList(union.getInputs(), GearLogicalConvention.INSTANCE),
+ union.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
new file mode 100644
index 0000000..b04eec2
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/rule/GearValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.gearpump.sql.rel.GearLogicalConvention;
+import org.apache.gearpump.sql.rel.GearValuesRel;
+
+public class GearValuesRule extends ConverterRule {
+
+ public static final GearValuesRule INSTANCE = new GearValuesRule();
+
+ private GearValuesRule() {
+ super(LogicalValues.class, Convention.NONE,
+ GearLogicalConvention.INSTANCE, "GearValuesRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ Values values = (Values) rel;
+ return new GearValuesRel(
+ values.getCluster(),
+ values.getRowType(),
+ values.getTuples(),
+ values.getTraitSet().replace(GearLogicalConvention.INSTANCE)
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
new file mode 100644
index 0000000..7ecba21
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleString.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+public class SampleString {
+
+ public static JavaStream<String> WORDS;
+
+ public static class Stream {
+ public static final Message[] KV = {new Message("001", "This is a good start, bingo!! bingo!!")};
+
+ public static String getKV() {
+ return KV[0].WORD;
+ }
+ }
+
+ public static class Message {
+ public final String ID;
+ public final String WORD;
+
+ public Message(String ID, String WORD) {
+ this.ID = ID;
+ this.WORD = WORD;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
new file mode 100644
index 0000000..4aa20e0
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/SampleTransactions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+public class SampleTransactions {
+
+ public static class Transactions {
+
+ public final Order[] ORDERS = {
+ new Order("001", 3),
+ new Order("002", 5),
+ new Order("003", 8),
+ new Order("004", 15),
+ };
+
+ public final Product[] PRODUCTS = {
+ new Product("001", "Book"),
+ new Product("002", "Pen"),
+ new Product("003", "Pencil"),
+ new Product("004", "Ruler"),
+ };
+ }
+
+ public static class Order {
+ public final String ID;
+ public final int QUANTITY;
+
+ public Order(String ID, int QUANTITY) {
+ this.ID = ID;
+ this.QUANTITY = QUANTITY;
+ }
+ }
+
+ public static class Product {
+ public final String ID;
+ public final String NAME;
+
+ public Product(String ID, String NAME) {
+ this.ID = ID;
+ this.NAME = NAME;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
new file mode 100644
index 0000000..4ff9efd
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/table/TransactionsTableFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.table;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.Map;
+
+public class TransactionsTableFactory implements TableFactory<Table> {
+
+ @Override
+ public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) {
+ final Object[][] rows = {
+ {100, "I001", "item1", 3},
+ {101, "I002", "item2", 5},
+ {102, "I003", "item3", 8},
+ {103, "I004", "item4", 33},
+ {104, "I005", "item5", 23}
+ };
+
+ return new TransactionsTable(ImmutableList.copyOf(rows));
+ }
+
+ public static class TransactionsTable implements ScannableTable {
+
+ protected final RelProtoDataType protoRowType = new RelProtoDataType() {
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("timeStamp", SqlTypeName.TIMESTAMP)
+ .add("id", SqlTypeName.VARCHAR, 10)
+ .add("item", SqlTypeName.VARCHAR, 50)
+ .add("quantity", SqlTypeName.INTEGER)
+ .build();
+ }
+ };
+
+ private final ImmutableList<Object[]> rows;
+
+ public TransactionsTable(ImmutableList<Object[]> rows) {
+ this.rows = rows;
+ }
+
+ public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(rows);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
new file mode 100644
index 0000000..a63036d
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/CalciteFrameworkConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RuleSets;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CalciteFrameworkConfiguration {
+
+ public static FrameworkConfig getDefaultconfig(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ FrameworkConfig frameworkConfiguration = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.JAVA)
+ .build())
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .context(Contexts.EMPTY_CONTEXT)
+ .ruleSets(RuleSets.ofList())
+ .costFactory(null)
+ .typeSystem(RelDataTypeSystem.DEFAULT)
+ .build();
+
+ return frameworkConfiguration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
new file mode 100644
index 0000000..03b2a47
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/utils/GearConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.utils;
+
+import com.typesafe.config.Config;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+public class GearConfiguration {
+
+ private Config akkaConf;
+ private ClientContext context;
+ public static JavaStreamApp app;
+
+ public void defaultConfiguration() {
+ setAkkaConf(ClusterConfig.defaultConfig());
+ setContext(ClientContext.apply(akkaConf));
+ }
+
+ public void ConfigJavaStreamApp() {
+ app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+ }
+
+ public void setAkkaConf(Config akkaConf) {
+ this.akkaConf = akkaConf;
+ }
+
+ public void setContext(ClientContext context) {
+ this.context = context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
new file mode 100644
index 0000000..d3d723f
--- /dev/null
+++ b/experiments/sql/src/main/java/org/apache/gearpump/sql/validator/CalciteSqlValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.sql.validator;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+public class CalciteSqlValidator extends SqlValidatorImpl {
+ public CalciteSqlValidator(SqlOperatorTable opTab,
+ CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory,
+ SqlConformance conformance) {
+ super(opTab, catalogReader, typeFactory, conformance);
+ }
+
+ @Override
+ protected RelDataType getLogicalSourceRowType(
+ RelDataType sourceRowType, SqlInsert insert) {
+ final RelDataType superType =
+ super.getLogicalSourceRowType(sourceRowType, insert);
+ return ((JavaTypeFactory) typeFactory).toSql(superType);
+ }
+
+ @Override
+ protected RelDataType getLogicalTargetRowType(
+ RelDataType targetRowType, SqlInsert insert) {
+ final RelDataType superType =
+ super.getLogicalTargetRowType(targetRowType, insert);
+ return ((JavaTypeFactory) typeFactory).toSql(superType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/54686e0e/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
----------------------------------------------------------------------
diff --git a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
index 8fae091..ee55aa8 100644
--- a/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
+++ b/experiments/sql/src/main/scala/org/apache/gearpump/experiments/sql/Connection.scala
@@ -13,9 +13,6 @@ import org.apache.calcite.linq4j.tree.Expression
import org.apache.calcite.linq4j.{Enumerator, Queryable}
import org.apache.log4j.Logger
-/**
- * Created by Buddhi on 6/8/2017.
- */
class Connection extends CalciteConnection {
import org.apache.calcite.schema.SchemaPlus