You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/15 22:57:02 UTC
[2/2] samza git commit: SAMZA-649 Move SQL front end files depending
on Calcite to samza-sql-calcite
SAMZA-649 Move SQL front end files depending on Calcite to samza-sql-calcite
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46a6de43
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46a6de43
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46a6de43
Branch: refs/heads/samza-sql
Commit: 46a6de437202b199273c0602ac15db515a6040b9
Parents: bc5833b
Author: Milinda L. Pathirage <mi...@gmail.com>
Authored: Wed Apr 15 13:36:50 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Wed Apr 15 13:36:50 2015 -0700
----------------------------------------------------------------------
build.gradle | 11 +
.../samza/sql/calcite/planner/QueryPlanner.java | 212 +++++++++++
.../calcite/planner/SamzaCalciteConnection.java | 373 +++++++++++++++++++
.../planner/SamzaQueryPreparingStatement.java | 111 ++++++
.../sql/calcite/planner/SamzaSqlValidator.java | 57 +++
.../sql/calcite/schema/AvroSchemaConverter.java | 132 +++++++
.../planner/SamzaStreamTableFactory.java | 94 +++++
.../sql/calcite/planner/TestQueryPlanner.java | 94 +++++
.../calcite/schema/TestAvroSchemaConverter.java | 55 +++
.../samza/sql/metadata/AvroSchemaConverter.java | 132 -------
.../apache/samza/sql/planner/QueryPlanner.java | 214 -----------
.../sql/planner/SamzaCalciteConnection.java | 373 -------------------
.../planner/SamzaQueryPreparingStatement.java | 111 ------
.../samza/sql/planner/SamzaSqlValidator.java | 54 ---
.../samza/sql/planner/QueryPlannerTest.java | 94 -----
.../sql/planner/SamzaStreamTableFactory.java | 94 -----
.../test/metadata/TestAvroSchemaConverter.java | 57 ---
settings.gradle | 3 +-
18 files changed, 1141 insertions(+), 1130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a1c7133..a042567 100644
--- a/build.gradle
+++ b/build.gradle
@@ -279,6 +279,17 @@ project(":samza-sql-core_$scalaVersion") {
}
}
+project(":samza-sql-calcite_$scalaVersion") {
+ apply plugin: 'java'
+
+ dependencies {
+ compile project(":samza-sql-core_$scalaVersion")
+ compile "org.apache.calcite:calcite-core:$calciteVersion"
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+ }
+}
+
project(":samza-shell") {
apply plugin: 'java'
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
new file mode 100644
index 0000000..e1c22e9
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.*;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.rules.*;
+import org.apache.calcite.rel.stream.StreamRules;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.List;
+
+/**
+ * Streaming query planner implementation based on Calcite.
+ */
+public class QueryPlanner {
+ public static final boolean COMMUTE =
+ "true".equals(
+ System.getProperties().getProperty("calcite.enable.join.commute"));
+
+ /**
+ * Whether to enable the collation trait. Some extra optimizations are
+ * possible if enabled, but queries should work either way. At some point
+ * this will become a preference, or we will run multiple phases: first
+ * disabled, then enabled.
+ */
+ private static final boolean ENABLE_COLLATION_TRAIT = true;
+
+ private static final List<RelOptRule> DEFAULT_RULES =
+ ImmutableList.of(
+ AggregateStarTableRule.INSTANCE,
+ AggregateStarTableRule.INSTANCE2,
+ TableScanRule.INSTANCE,
+ COMMUTE
+ ? JoinAssociateRule.INSTANCE
+ : ProjectMergeRule.INSTANCE,
+ FilterTableScanRule.INSTANCE,
+ ProjectFilterTransposeRule.INSTANCE,
+ FilterProjectTransposeRule.INSTANCE,
+ FilterJoinRule.FILTER_ON_JOIN,
+ AggregateExpandDistinctAggregatesRule.INSTANCE,
+ AggregateReduceFunctionsRule.INSTANCE,
+ FilterAggregateTransposeRule.INSTANCE,
+ JoinCommuteRule.INSTANCE,
+ JoinPushThroughJoinRule.RIGHT,
+ JoinPushThroughJoinRule.LEFT,
+ SortProjectTransposeRule.INSTANCE);
+
+ private static final List<RelOptRule> ENUMERABLE_RULES =
+ ImmutableList.of(
+ EnumerableRules.ENUMERABLE_JOIN_RULE,
+ EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+ EnumerableRules.ENUMERABLE_CORRELATE_RULE,
+ EnumerableRules.ENUMERABLE_PROJECT_RULE,
+ EnumerableRules.ENUMERABLE_FILTER_RULE,
+ EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
+ EnumerableRules.ENUMERABLE_SORT_RULE,
+ EnumerableRules.ENUMERABLE_LIMIT_RULE,
+ EnumerableRules.ENUMERABLE_COLLECT_RULE,
+ EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
+ EnumerableRules.ENUMERABLE_UNION_RULE,
+ EnumerableRules.ENUMERABLE_INTERSECT_RULE,
+ EnumerableRules.ENUMERABLE_MINUS_RULE,
+ EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
+ EnumerableRules.ENUMERABLE_VALUES_RULE,
+ EnumerableRules.ENUMERABLE_WINDOW_RULE,
+ EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
+
+ private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
+ ImmutableList.of(
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE,
+ ValuesReduceRule.FILTER_INSTANCE,
+ ValuesReduceRule.PROJECT_FILTER_INSTANCE,
+ ValuesReduceRule.PROJECT_INSTANCE);
+
+ /**
+ * Transform streaming query to a query plan.
+ * @param query streaming query in SQL with streaming extensions
+ * @param context query prepare context
+ * @return query plan
+ */
+ public RelNode getPlan(String query, CalcitePrepare.Context context) {
+ final JavaTypeFactory typeFactory = context.getTypeFactory();
+ final CalciteConnectionConfig config = context.config();
+
+ CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(),
+ false,
+ context.getDefaultSchemaPath(),
+ typeFactory);
+
+ SqlParser sqlParser = SqlParser.create(query,
+ SqlParser.configBuilder()
+ .setQuotedCasing(config.quotedCasing())
+ .setUnquotedCasing(config.unquotedCasing())
+ .setQuoting(config.quoting())
+ .build());
+
+ SqlNode sqlNode;
+
+ try {
+ sqlNode = sqlParser.parseStmt();
+ } catch (SqlParseException e) {
+ throw new RuntimeException("parse failed: " + e.getMessage(), e);
+ }
+
+ final ChainedSqlOperatorTable operatorTable =
+ new ChainedSqlOperatorTable(
+ ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader));
+
+ final SqlValidator validator =
+ new SamzaSqlValidator(operatorTable, catalogReader, typeFactory);
+ validator.setIdentifierExpansion(true);
+
+ SqlNode validatedSqlNode = validator.validate(sqlNode);
+
+ final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null);
+
+ final SamzaQueryPreparingStatement preparingStmt =
+ new SamzaQueryPreparingStatement(
+ context,
+ catalogReader,
+ typeFactory,
+ context.getRootSchema(),
+ EnumerableRel.Prefer.ARRAY,
+ planner,
+ EnumerableConvention.INSTANCE);
+
+ /* TODO: Add query optimization. */
+
+ return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true);
+ }
+
+ /**
+ * Creates a query planner and initializes it with a default set of
+ * rules.
+ *
+ * @param prepareContext context for preparing a statement
+ * @param externalContext external query planning context
+ * @param costFactory cost factory for cost based query planning
+ * @return relation query planner instance
+ */
+ protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext,
+ org.apache.calcite.plan.Context externalContext,
+ RelOptCostFactory costFactory) {
+ if (externalContext == null) {
+ externalContext = Contexts.withConfig(prepareContext.config());
+ }
+
+ final VolcanoPlanner planner =
+ new VolcanoPlanner(costFactory, externalContext);
+
+ planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+
+ if (ENABLE_COLLATION_TRAIT) {
+ planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
+ planner.registerAbstractRelationalRules();
+ }
+ RelOptUtil.registerAbstractRels(planner);
+ for (RelOptRule rule : DEFAULT_RULES) {
+ planner.addRule(rule);
+ }
+
+ /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */
+
+ for (RelOptRule rule : ENUMERABLE_RULES) {
+ planner.addRule(rule);
+ }
+
+ for (RelOptRule rule : StreamRules.RULES) {
+ planner.addRule(rule);
+ }
+
+ /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */
+
+ return planner;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
new file mode 100644
index 0000000..64c8f83
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaCalciteConnection.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteRootSchema;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables
+ * re-use of Calcite code.
+ */
+public class SamzaCalciteConnection implements CalciteConnection {
+ private static final String INLINE = "inline:";
+ private final JavaTypeFactory typeFactory;
+ private final CalciteRootSchema rootSchema;
+ private String schema;
+
+ public SamzaCalciteConnection(String model) throws IOException {
+ typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ rootSchema = CalciteSchema.createRootSchema(true);
+ new ModelHandler(this, INLINE + model);
+ }
+
+ public CalciteRootSchema getCalciteRootSchema(){
+ return rootSchema;
+ }
+
+ @Override
+ public SchemaPlus getRootSchema() {
+ return rootSchema.plus();
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory() {
+ return typeFactory;
+ }
+
+ @Override
+ public Properties getProperties() {
+ return null;
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void commit() throws SQLException {
+
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+
+ }
+
+ @Override
+ public void close() throws SQLException {
+
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ this.schema = schema;
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return schema;
+ }
+
+ public void abort(Executor executor) throws SQLException {
+
+ }
+
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+ }
+
+ public int getNetworkTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public CalciteConnectionConfig config() {
+ return new CalciteConnectionConfigImpl(new Properties());
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
+ return null;
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
+ return null;
+ }
+
+ @Override
+ public <T> T execute(Expression expression, Class<T> type) {
+ return null;
+ }
+
+ @Override
+ public <T> T execute(Expression expression, Type type) {
+ return null;
+ }
+
+ @Override
+ public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+ return null;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
new file mode 100644
index 0000000..89c4836
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaQueryPreparingStatement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.Maps;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+
+import java.util.List;
+import java.util.Map;
+
+public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander {
+ private final RelOptPlanner planner;
+ private final RexBuilder rexBuilder;
+ protected final CalciteSchema schema;
+ protected final RelDataTypeFactory typeFactory;
+ private final EnumerableRel.Prefer prefer;
+ private final Map<String, Object> internalParameters =
+ Maps.newLinkedHashMap();
+ private int expansionDepth;
+ private SqlValidator sqlValidator;
+
+ public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader,
+ RelDataTypeFactory typeFactory,
+ CalciteSchema schema,
+ EnumerableRel.Prefer prefer,
+ RelOptPlanner planner,
+ Convention resultConvention) {
+ super(context, catalogReader, resultConvention);
+ this.schema = schema;
+ this.prefer = prefer;
+ this.planner = planner;
+ this.typeFactory = typeFactory;
+ this.rexBuilder = new RexBuilder(typeFactory);
+ }
+
+ @Override
+ protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) {
+ return null;
+ }
+
+ @Override
+ protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) {
+ return null;
+ }
+
+ @Override
+ protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) {
+ SqlToRelConverter sqlToRelConverter =
+ new SqlToRelConverter(
+ this, validator, catalogReader, planner, rexBuilder,
+ StandardConvertletTable.INSTANCE);
+ sqlToRelConverter.setTrimUnusedFields(true);
+ return sqlToRelConverter;
+ }
+
+ @Override
+ public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
+ return null;
+ }
+
+ @Override
+ protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) {
+ return null;
+ }
+
+ @Override
+ protected void init(Class runtimeContextClass) {}
+
+ @Override
+ protected SqlValidator getSqlValidator() {
+ return null;
+ }
+
+ @Override
+ public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+ // TODO: Implement custom view expansions
+ return super.expandView(rowType, queryString, schemaPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
new file mode 100644
index 0000000..e0f9cd8
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/SamzaSqlValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+/**
+ * Defines a Samza specific SQL validator based on Calcite's SQL validator implementation.
+ */
+public class SamzaSqlValidator extends SqlValidatorImpl{
+ /**
+ * Creates a validator.
+ *
+ * @param opTab Operator table
+ * @param catalogReader Catalog reader
+ * @param typeFactory Type factory
+ */
+ protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
+ /* Note: We may need to define Samza specific SqlConformance instance in future. */
+ super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT);
+ }
+
+ @Override
+ protected RelDataType getLogicalSourceRowType(
+ RelDataType sourceRowType, SqlInsert insert) {
+ return ((JavaTypeFactory) typeFactory).toSql(sourceRowType);
+ }
+
+ @Override
+ protected RelDataType getLogicalTargetRowType(
+ RelDataType targetRowType, SqlInsert insert) {
+ return ((JavaTypeFactory) typeFactory).toSql(targetRowType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
new file mode 100644
index 0000000..705c0ff
--- /dev/null
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.schema;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts an Avro schema to Calcite RelDataType.
+ * <p></p>
+ * <p>Inspired by parquet-mr.</p>
+ */
+public class AvroSchemaConverter {
+
+ private final RelDataTypeFactory relDataTypeFactory;
+ private final Schema rootSchema;
+
+ public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
+ this.relDataTypeFactory = relDataTypeFactory;
+ this.rootSchema = schema;
+ }
+
+ public RelDataType convert() {
+ // At the top level only records are supported
+ if (rootSchema.getType() != Schema.Type.RECORD) {
+ throw new RuntimeException(
+ String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
+ rootSchema.getType()));
+ }
+
+ return convertRecord(rootSchema, true);
+ }
+
+ private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
+ RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
+
+ for (Schema.Field field : recordSchema.getFields()) {
+ Schema fieldSchema = field.schema();
+ if (fieldSchema.getType() == Schema.Type.NULL) {
+ continue;
+ }
+
+ convertField(builder, field.name(), fieldSchema);
+ }
+
+ RelDataType record = builder.build();
+ if(isRoot) {
+ // Record at root level is treated differently.
+ return record;
+ }
+
+ return relDataTypeFactory.createStructType(record.getFieldList());
+ }
+
+ private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
+ String fieldName,
+ Schema fieldSchema) {
+ builder.add(fieldName, convertFieldType(fieldSchema));
+ }
+
+ private RelDataType convertFieldType(Schema elementType) {
+ Schema.Type type = elementType.getType();
+ if (type == Schema.Type.STRING) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+ } else if (type == Schema.Type.INT) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+ } else if (type == Schema.Type.BOOLEAN) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ } else if (type == Schema.Type.BYTES) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
+ } else if (type == Schema.Type.LONG) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+ } else if (type == Schema.Type.DOUBLE) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+ } else if (type == Schema.Type.FLOAT) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+ } else if (type == Schema.Type.ARRAY) {
+ return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
+ } else if (type == Schema.Type.RECORD) {
+ return convertRecord(elementType, false);
+ } else if (type == Schema.Type.MAP) {
+ return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+ convertFieldType(elementType.getValueType()));
+ } else if (type == Schema.Type.FIXED) {
+ return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
+ } else if (type == Schema.Type.UNION) {
+ List<Schema> types = elementType.getTypes();
+ List<Schema> nonNullTypes = new ArrayList<Schema>();
+ boolean foundNull = false;
+
+ for(Schema s : types) {
+ if(s.getType() == Schema.Type.NULL){
+ foundNull = true;
+ } else {
+ nonNullTypes.add(s);
+ }
+ }
+
+ if(nonNullTypes.size() > 1){
+ throw new RuntimeException("Multiple non null types in a union is not supported.");
+ } else {
+ return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
+ }
+ } else if(type == Schema.Type.ENUM) {
+ // TODO: May be there is a better way to handle enums
+ relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+ }
+
+ return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
new file mode 100644
index 0000000..fd87aa5
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.Map;
+
+public class SamzaStreamTableFactory implements TableFactory<Table> {
+ public Table create(SchemaPlus schema, String name,
+ Map<String, Object> operand, RelDataType rowType) {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("id", SqlTypeName.INTEGER)
+ .add("product", SqlTypeName.VARCHAR, 10)
+ .add("quantity", SqlTypeName.INTEGER)
+ .build();
+ }
+ };
+ final ImmutableList<Object[]> rows = ImmutableList.of(
+ new Object[]{1, "paint", 10},
+ new Object[]{2, "paper", 5});
+
+ return new StreamableTable() {
+ public Table stream() {
+ return new OrdersTable(protoRowType, rows);
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ };
+ }
+
+ public static class OrdersTable implements ScannableTable {
+ private final RelProtoDataType protoRowType;
+ private final ImmutableList<Object[]> rows;
+
+ public OrdersTable(RelProtoDataType protoRowType,
+ ImmutableList<Object[]> rows) {
+ this.protoRowType = protoRowType;
+ this.rows = rows;
+ }
+
+ public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(rows);
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.STREAM;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
new file mode 100644
index 0000000..0bb15b2
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.util.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestQueryPlanner {
+ public static final String STREAM_SCHEMA = " {\n"
+ + " name: 'STREAMS',\n"
+ + " tables: [ {\n"
+ + " type: 'custom',\n"
+ + " name: 'ORDERS',\n"
+ + " stream: {\n"
+ + " stream: true\n"
+ + " },\n"
+ + " factory: '" + SamzaStreamTableFactory.class.getName() + "'\n"
+ + " } ]\n"
+ + " }\n";
+
+ public static final String STREAM_MODEL = "{\n"
+ + " version: '1.0',\n"
+ + " defaultSchema: 'STREAMS',\n"
+ + " schemas: [\n"
+ + STREAM_SCHEMA
+ + " ]\n"
+ + "}";
+
+ public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED =
+ "LogicalDelta\n" +
+ " LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" +
+ " LogicalFilter(condition=[>($2, 5)])\n" +
+ " EnumerableTableScan(table=[[STREAMS, ORDERS]])";
+ public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE =
+ "select stream * from orders where quantity > 5";
+
+ @Test
+ public void testQueryPlanner() throws IOException, SQLException {
+
+ SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL);
+ CalcitePrepare.Context context = Schemas.makeContext(connection,
+ connection.getCalciteRootSchema(),
+ ImmutableList.of(connection.getSchema()),
+ ImmutableMap.copyOf(defaultConfiguration()));
+
+ QueryPlanner planner = new QueryPlanner();
+ RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context);
+ Assert.assertNotNull(relNode);
+ String s = Util.toLinux(RelOptUtil.toString(relNode));
+ Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED));
+ }
+
+ public static Map<CalciteConnectionProperty, String> defaultConfiguration(){
+ Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>();
+
+ map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false");
+ map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name());
+ map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name());
+ map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name());
+
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
new file mode 100644
index 0000000..fbb5c59
--- /dev/null
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.schema;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAvroSchemaConverter {
+ public static final String SIMPLE_RECORD_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"name\": \"User\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\": \"name\", \"type\": \"string\"},\n" +
+ " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
+ " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
+ " ]\n" +
+ "}";
+
+ public static final Schema simpleRecord = new Schema.Parser().parse(SIMPLE_RECORD_SCHEMA);
+ @Test
+ public void testSimpleAvroRecord(){
+ RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ AvroSchemaConverter schemaConverter = new AvroSchemaConverter(relDataTypeFactory, simpleRecord);
+
+ RelDataType relDataType = schemaConverter.convert();
+
+ Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("name", false, false).getType().getSqlTypeName());
+ Assert.assertEquals(SqlTypeName.INTEGER, relDataType.getField("favorite_number", false, false).getType().getSqlTypeName());
+ Assert.assertTrue(relDataType.getField("favorite_number", false, false).getType().isNullable());
+ Assert.assertEquals(SqlTypeName.VARCHAR, relDataType.getField("favorite_color", false, false).getType().getSqlTypeName());
+ Assert.assertTrue(relDataType.getField("favorite_color", false, false).getType().isNullable());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
deleted file mode 100644
index 3dad046..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/metadata/AvroSchemaConverter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.metadata;
-
-import org.apache.avro.Schema;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Converts an Avro schema to Calcite RelDataType.
- * <p></p>
- * <p>Inspired by parquet-mr.</p>
- */
-public class AvroSchemaConverter {
-
- private final RelDataTypeFactory relDataTypeFactory;
- private final Schema rootSchema;
-
- public AvroSchemaConverter(RelDataTypeFactory relDataTypeFactory, Schema schema) {
- this.relDataTypeFactory = relDataTypeFactory;
- this.rootSchema = schema;
- }
-
- public RelDataType convert() {
- // At the top level only records are supported
- if (rootSchema.getType() != Schema.Type.RECORD) {
- throw new RuntimeException(
- String.format("Type: %s is unsupported at this level; Only Record type of supported at top level!",
- rootSchema.getType()));
- }
-
- return convertRecord(rootSchema, true);
- }
-
- private RelDataType convertRecord(Schema recordSchema, boolean isRoot) {
- RelDataTypeFactory.FieldInfoBuilder builder = relDataTypeFactory.builder();
-
- for (Schema.Field field : recordSchema.getFields()) {
- Schema fieldSchema = field.schema();
- if (fieldSchema.getType() == Schema.Type.NULL) {
- continue;
- }
-
- convertField(builder, field.name(), fieldSchema);
- }
-
- RelDataType record = builder.build();
- if(isRoot) {
- // Record at root level is treated differently.
- return record;
- }
-
- return relDataTypeFactory.createStructType(record.getFieldList());
- }
-
- private void convertField(RelDataTypeFactory.FieldInfoBuilder builder,
- String fieldName,
- Schema fieldSchema) {
- builder.add(fieldName, convertFieldType(fieldSchema));
- }
-
- private RelDataType convertFieldType(Schema elementType) {
- Schema.Type type = elementType.getType();
- if (type == Schema.Type.STRING) {
- return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
- } else if (type == Schema.Type.INT) {
- return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
- } else if (type == Schema.Type.BOOLEAN) {
- return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
- } else if (type == Schema.Type.BYTES) {
- return relDataTypeFactory.createSqlType(SqlTypeName.BINARY);
- } else if (type == Schema.Type.LONG) {
- return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
- } else if (type == Schema.Type.DOUBLE) {
- return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
- } else if (type == Schema.Type.FLOAT) {
- return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
- } else if (type == Schema.Type.ARRAY) {
- return relDataTypeFactory.createArrayType(convertFieldType(elementType), -1);
- } else if (type == Schema.Type.RECORD) {
- return convertRecord(elementType, false);
- } else if (type == Schema.Type.MAP) {
- return relDataTypeFactory.createMapType(relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
- convertFieldType(elementType.getValueType()));
- } else if (type == Schema.Type.FIXED) {
- return relDataTypeFactory.createSqlType(SqlTypeName.VARBINARY, elementType.getFixedSize());
- } else if (type == Schema.Type.UNION) {
- List<Schema> types = elementType.getTypes();
- List<Schema> nonNullTypes = new ArrayList<Schema>();
- boolean foundNull = false;
-
- for(Schema s : types) {
- if(s.getType() == Schema.Type.NULL){
- foundNull = true;
- } else {
- nonNullTypes.add(s);
- }
- }
-
- if(nonNullTypes.size() > 1){
- throw new RuntimeException("Multiple non null types in a union is not supported.");
- } else {
- return relDataTypeFactory.createTypeWithNullability(convertFieldType(nonNullTypes.get(0)), foundNull);
- }
- } else if(type == Schema.Type.ENUM) {
- // TODO: May be there is a better way to handle enums
- relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR);
- }
-
- return relDataTypeFactory.createSqlType(SqlTypeName.ANY);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
deleted file mode 100644
index 1dfb262..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.adapter.enumerable.EnumerableRules;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.plan.*;
-import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.rules.*;
-import org.apache.calcite.rel.stream.StreamRules;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlValidator;
-
-import java.util.List;
-
-/**
- * Streaming query planner implementation based on Calcite.
- */
-public class QueryPlanner {
- public static final boolean COMMUTE =
- "true".equals(
- System.getProperties().getProperty("calcite.enable.join.commute"));
-
- /**
- * Whether to enable the collation trait. Some extra optimizations are
- * possible if enabled, but queries should work either way. At some point
- * this will become a preference, or we will run multiple phases: first
- * disabled, then enabled.
- */
- private static final boolean ENABLE_COLLATION_TRAIT = true;
-
- private static final List<RelOptRule> DEFAULT_RULES =
- ImmutableList.of(
- AggregateStarTableRule.INSTANCE,
- AggregateStarTableRule.INSTANCE2,
- TableScanRule.INSTANCE,
- COMMUTE
- ? JoinAssociateRule.INSTANCE
- : ProjectMergeRule.INSTANCE,
- FilterTableScanRule.INSTANCE,
- ProjectFilterTransposeRule.INSTANCE,
- FilterProjectTransposeRule.INSTANCE,
- FilterJoinRule.FILTER_ON_JOIN,
- AggregateExpandDistinctAggregatesRule.INSTANCE,
- AggregateReduceFunctionsRule.INSTANCE,
- FilterAggregateTransposeRule.INSTANCE,
- JoinCommuteRule.INSTANCE,
- JoinPushThroughJoinRule.RIGHT,
- JoinPushThroughJoinRule.LEFT,
- SortProjectTransposeRule.INSTANCE);
-
- private static final List<RelOptRule> ENUMERABLE_RULES =
- ImmutableList.of(
- EnumerableRules.ENUMERABLE_JOIN_RULE,
- EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
- EnumerableRules.ENUMERABLE_CORRELATE_RULE,
- EnumerableRules.ENUMERABLE_PROJECT_RULE,
- EnumerableRules.ENUMERABLE_FILTER_RULE,
- EnumerableRules.ENUMERABLE_AGGREGATE_RULE,
- EnumerableRules.ENUMERABLE_SORT_RULE,
- EnumerableRules.ENUMERABLE_LIMIT_RULE,
- EnumerableRules.ENUMERABLE_COLLECT_RULE,
- EnumerableRules.ENUMERABLE_UNCOLLECT_RULE,
- EnumerableRules.ENUMERABLE_UNION_RULE,
- EnumerableRules.ENUMERABLE_INTERSECT_RULE,
- EnumerableRules.ENUMERABLE_MINUS_RULE,
- EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE,
- EnumerableRules.ENUMERABLE_VALUES_RULE,
- EnumerableRules.ENUMERABLE_WINDOW_RULE,
- EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE);
-
- private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
- ImmutableList.of(
- ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.FILTER_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE,
- ReduceExpressionsRule.JOIN_INSTANCE,
- ValuesReduceRule.FILTER_INSTANCE,
- ValuesReduceRule.PROJECT_FILTER_INSTANCE,
- ValuesReduceRule.PROJECT_INSTANCE);
-
- /**
- * Transform streaming query to a query plan.
- * @param query streaming query in SQL with streaming extensions
- * @param context query prepare context
- * @return query plan
- */
- public RelNode getPlan(String query, CalcitePrepare.Context context) {
- final JavaTypeFactory typeFactory = context.getTypeFactory();
- final CalciteConnectionConfig config = context.config();
-
- CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(),
- false,
- context.getDefaultSchemaPath(),
- typeFactory);
-
- SqlParser sqlParser = SqlParser.create(query,
- SqlParser.configBuilder()
- .setQuotedCasing(config.quotedCasing())
- .setUnquotedCasing(config.unquotedCasing())
- .setQuoting(config.quoting())
- .build());
-
- SqlNode sqlNode;
-
- try {
- sqlNode = sqlParser.parseStmt();
- } catch (SqlParseException e) {
- throw new RuntimeException("parse failed: " + e.getMessage(), e);
- }
-
- final ChainedSqlOperatorTable operatorTable =
- new ChainedSqlOperatorTable(
- ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader));
-
- final SqlValidator validator =
- new SamzaSqlValidator(operatorTable, catalogReader, typeFactory);
- validator.setIdentifierExpansion(true);
-
- SqlNode validatedSqlNode = validator.validate(sqlNode);
-
- final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null);
-
- final SamzaQueryPreparingStatement preparingStmt =
- new SamzaQueryPreparingStatement(
- context,
- catalogReader,
- typeFactory,
- context.getRootSchema(),
- EnumerableRel.Prefer.ARRAY,
- planner,
- EnumerableConvention.INSTANCE);
-
- /* TODO: Add query optimization. */
-
- return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true);
- }
-
- /**
- * Creates a query planner and initializes it with a default set of
- * rules.
- *
- * @param prepareContext context for preparing a statement
- * @param externalContext external query planning context
- * @param costFactory cost factory for cost based query planning
- * @return relation query planner instance
- */
- protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext,
- org.apache.calcite.plan.Context externalContext,
- RelOptCostFactory costFactory) {
- if (externalContext == null) {
- externalContext = Contexts.withConfig(prepareContext.config());
- }
-
- final VolcanoPlanner planner =
- new VolcanoPlanner(costFactory, externalContext);
-
- planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
-
- if (ENABLE_COLLATION_TRAIT) {
- planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
- planner.registerAbstractRelationalRules();
- }
- RelOptUtil.registerAbstractRels(planner);
- for (RelOptRule rule : DEFAULT_RULES) {
- planner.addRule(rule);
- }
-
- /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */
-
- for (RelOptRule rule : ENUMERABLE_RULES) {
- planner.addRule(rule);
- }
-
- for (RelOptRule rule : StreamRules.RULES) {
- planner.addRule(rule);
- }
-
- /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */
-
- return planner;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
deleted file mode 100644
index 63b1da5..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.config.CalciteConnectionConfigImpl;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.jdbc.CalciteRootSchema;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.model.ModelHandler;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.sql.*;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables
- * re-use of Calcite code.
- */
-public class SamzaCalciteConnection implements CalciteConnection {
- private static final String INLINE = "inline:";
- private final JavaTypeFactory typeFactory;
- private final CalciteRootSchema rootSchema;
- private String schema;
-
- public SamzaCalciteConnection(String model) throws IOException {
- typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
- rootSchema = CalciteSchema.createRootSchema(true);
- new ModelHandler(this, INLINE + model);
- }
-
- public CalciteRootSchema getCalciteRootSchema(){
- return rootSchema;
- }
-
- @Override
- public SchemaPlus getRootSchema() {
- return rootSchema.plus();
- }
-
- @Override
- public JavaTypeFactory getTypeFactory() {
- return typeFactory;
- }
-
- @Override
- public Properties getProperties() {
- return null;
- }
-
- @Override
- public Statement createStatement() throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql) throws SQLException {
- return null;
- }
-
- @Override
- public CallableStatement prepareCall(String sql) throws SQLException {
- return null;
- }
-
- @Override
- public String nativeSQL(String sql) throws SQLException {
- return null;
- }
-
- @Override
- public void setAutoCommit(boolean autoCommit) throws SQLException {
-
- }
-
- @Override
- public boolean getAutoCommit() throws SQLException {
- return false;
- }
-
- @Override
- public void commit() throws SQLException {
-
- }
-
- @Override
- public void rollback() throws SQLException {
-
- }
-
- @Override
- public void close() throws SQLException {
-
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return false;
- }
-
- @Override
- public DatabaseMetaData getMetaData() throws SQLException {
- return null;
- }
-
- @Override
- public void setReadOnly(boolean readOnly) throws SQLException {
-
- }
-
- @Override
- public boolean isReadOnly() throws SQLException {
- return false;
- }
-
- @Override
- public void setCatalog(String catalog) throws SQLException {
-
- }
-
- @Override
- public String getCatalog() throws SQLException {
- return null;
- }
-
- @Override
- public void setTransactionIsolation(int level) throws SQLException {
-
- }
-
- @Override
- public int getTransactionIsolation() throws SQLException {
- return 0;
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return null;
- }
-
- @Override
- public void clearWarnings() throws SQLException {
-
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return null;
- }
-
- @Override
- public Map<String, Class<?>> getTypeMap() throws SQLException {
- return null;
- }
-
- @Override
- public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
-
- }
-
- @Override
- public void setHoldability(int holdability) throws SQLException {
-
- }
-
- @Override
- public int getHoldability() throws SQLException {
- return 0;
- }
-
- @Override
- public Savepoint setSavepoint() throws SQLException {
- return null;
- }
-
- @Override
- public Savepoint setSavepoint(String name) throws SQLException {
- return null;
- }
-
- @Override
- public void rollback(Savepoint savepoint) throws SQLException {
-
- }
-
- @Override
- public void releaseSavepoint(Savepoint savepoint) throws SQLException {
-
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return null;
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return null;
- }
-
- @Override
- public Clob createClob() throws SQLException {
- return null;
- }
-
- @Override
- public Blob createBlob() throws SQLException {
- return null;
- }
-
- @Override
- public NClob createNClob() throws SQLException {
- return null;
- }
-
- @Override
- public SQLXML createSQLXML() throws SQLException {
- return null;
- }
-
- @Override
- public boolean isValid(int timeout) throws SQLException {
- return false;
- }
-
- @Override
- public void setClientInfo(String name, String value) throws SQLClientInfoException {
-
- }
-
- @Override
- public void setClientInfo(Properties properties) throws SQLClientInfoException {
-
- }
-
- @Override
- public String getClientInfo(String name) throws SQLException {
- return null;
- }
-
- @Override
- public Properties getClientInfo() throws SQLException {
- return null;
- }
-
- @Override
- public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- return null;
- }
-
- @Override
- public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- return null;
- }
-
- @Override
- public void setSchema(String schema) throws SQLException {
- this.schema = schema;
- }
-
- @Override
- public String getSchema() throws SQLException {
- return schema;
- }
-
- public void abort(Executor executor) throws SQLException {
-
- }
-
- public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
-
- }
-
- public int getNetworkTimeout() throws SQLException {
- return 0;
- }
-
- @Override
- public CalciteConnectionConfig config() {
- return new CalciteConnectionConfigImpl(new Properties());
- }
-
- @Override
- public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
- return null;
- }
-
- @Override
- public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
- return null;
- }
-
- @Override
- public <T> T execute(Expression expression, Class<T> type) {
- return null;
- }
-
- @Override
- public <T> T execute(Expression expression, Type type) {
- return null;
- }
-
- @Override
- public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
- return null;
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return null;
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
deleted file mode 100644
index 0721573..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.Maps;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.sql2rel.StandardConvertletTable;
-
-import java.util.List;
-import java.util.Map;
-
-public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander {
- private final RelOptPlanner planner;
- private final RexBuilder rexBuilder;
- protected final CalciteSchema schema;
- protected final RelDataTypeFactory typeFactory;
- private final EnumerableRel.Prefer prefer;
- private final Map<String, Object> internalParameters =
- Maps.newLinkedHashMap();
- private int expansionDepth;
- private SqlValidator sqlValidator;
-
- public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader,
- RelDataTypeFactory typeFactory,
- CalciteSchema schema,
- EnumerableRel.Prefer prefer,
- RelOptPlanner planner,
- Convention resultConvention) {
- super(context, catalogReader, resultConvention);
- this.schema = schema;
- this.prefer = prefer;
- this.planner = planner;
- this.typeFactory = typeFactory;
- this.rexBuilder = new RexBuilder(typeFactory);
- }
-
- @Override
- protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) {
- return null;
- }
-
- @Override
- protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) {
- return null;
- }
-
- @Override
- protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) {
- SqlToRelConverter sqlToRelConverter =
- new SqlToRelConverter(
- this, validator, catalogReader, planner, rexBuilder,
- StandardConvertletTable.INSTANCE);
- sqlToRelConverter.setTrimUnusedFields(true);
- return sqlToRelConverter;
- }
-
- @Override
- public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
- return null;
- }
-
- @Override
- protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) {
- return null;
- }
-
- @Override
- protected void init(Class runtimeContextClass) {}
-
- @Override
- protected SqlValidator getSqlValidator() {
- return null;
- }
-
- @Override
- public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
- // TODO: Implement custom view expansions
- return super.expandView(rowType, queryString, schemaPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
deleted file mode 100644
index f46c1f0..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlConformance;
-import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
-import org.apache.calcite.sql.validate.SqlValidatorImpl;
-
-public class SamzaSqlValidator extends SqlValidatorImpl{
- /**
- * Creates a validator.
- *
- * @param opTab Operator table
- * @param catalogReader Catalog reader
- * @param typeFactory Type factory
- */
- protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
- /* Note: We may need to define Samza specific SqlConformance instance in future. */
- super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT);
- }
-
- @Override
- protected RelDataType getLogicalSourceRowType(
- RelDataType sourceRowType, SqlInsert insert) {
- return ((JavaTypeFactory) typeFactory).toSql(sourceRowType);
- }
-
- @Override
- protected RelDataType getLogicalTargetRowType(
- RelDataType targetRowType, SqlInsert insert) {
- return ((JavaTypeFactory) typeFactory).toSql(targetRowType);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
deleted file mode 100644
index 022116e..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.avatica.util.Casing;
-import org.apache.calcite.avatica.util.Quoting;
-import org.apache.calcite.config.CalciteConnectionProperty;
-import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.util.Util;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class QueryPlannerTest {
- public static final String STREAM_SCHEMA = " {\n"
- + " name: 'STREAMS',\n"
- + " tables: [ {\n"
- + " type: 'custom',\n"
- + " name: 'ORDERS',\n"
- + " stream: {\n"
- + " stream: true\n"
- + " },\n"
- + " factory: '" + SamzaStreamTableFactory.class.getName() + "'\n"
- + " } ]\n"
- + " }\n";
-
- public static final String STREAM_MODEL = "{\n"
- + " version: '1.0',\n"
- + " defaultSchema: 'STREAMS',\n"
- + " schemas: [\n"
- + STREAM_SCHEMA
- + " ]\n"
- + "}";
-
- public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED =
- "LogicalDelta\n" +
- " LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" +
- " LogicalFilter(condition=[>($2, 5)])\n" +
- " EnumerableTableScan(table=[[STREAMS, ORDERS]])";
- public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE =
- "select stream * from orders where quantity > 5";
-
- @Test
- public void testQueryPlanner() throws IOException, SQLException {
-
- SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL);
- CalcitePrepare.Context context = Schemas.makeContext(connection,
- connection.getCalciteRootSchema(),
- ImmutableList.of(connection.getSchema()),
- ImmutableMap.copyOf(defaultConfiguration()));
-
- QueryPlanner planner = new QueryPlanner();
- RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context);
- Assert.assertNotNull(relNode);
- String s = Util.toLinux(RelOptUtil.toString(relNode));
- Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED));
- }
-
- public static Map<CalciteConnectionProperty, String> defaultConfiguration(){
- Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>();
-
- map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false");
- map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name());
- map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name());
- map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name());
-
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/46a6de43/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java b/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
deleted file mode 100644
index f757d8f..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.*;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.Map;
-
-public class SamzaStreamTableFactory implements TableFactory<Table> {
- public Table create(SchemaPlus schema, String name,
- Map<String, Object> operand, RelDataType rowType) {
- final RelProtoDataType protoRowType = new RelProtoDataType() {
- public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("id", SqlTypeName.INTEGER)
- .add("product", SqlTypeName.VARCHAR, 10)
- .add("quantity", SqlTypeName.INTEGER)
- .build();
- }
- };
- final ImmutableList<Object[]> rows = ImmutableList.of(
- new Object[]{1, "paint", 10},
- new Object[]{2, "paper", 5});
-
- return new StreamableTable() {
- public Table stream() {
- return new OrdersTable(protoRowType, rows);
- }
-
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return protoRowType.apply(typeFactory);
- }
-
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
- }
-
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
- };
- }
-
- public static class OrdersTable implements ScannableTable {
- private final RelProtoDataType protoRowType;
- private final ImmutableList<Object[]> rows;
-
- public OrdersTable(RelProtoDataType protoRowType,
- ImmutableList<Object[]> rows) {
- this.protoRowType = protoRowType;
- this.rows = rows;
- }
-
- public Enumerable<Object[]> scan(DataContext root) {
- return Linq4j.asEnumerable(rows);
- }
-
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return protoRowType.apply(typeFactory);
- }
-
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
- }
-
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.STREAM;
- }
- }
-}