You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/15 18:41:59 UTC
[4/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.
[BEAM-2740] Hide BeamSqlEnv.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49aad927
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49aad927
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49aad927
Branch: refs/heads/DSL_SQL
Commit: 49aad927d4d9cf58c30c04641c766a62d44f44b7
Parents: 9eec6a0
Author: James Xu <xu...@gmail.com>
Authored: Wed Aug 9 18:54:54 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Tue Aug 15 11:40:39 2017 -0700
----------------------------------------------------------------------
.../sdk/extensions/sql/BeamRecordSqlType.java | 185 ++++++++
.../apache/beam/sdk/extensions/sql/BeamSql.java | 113 ++---
.../beam/sdk/extensions/sql/BeamSqlCli.java | 65 ---
.../beam/sdk/extensions/sql/BeamSqlEnv.java | 127 ------
.../sdk/extensions/sql/BeamSqlRecordHelper.java | 217 +++++++++
.../beam/sdk/extensions/sql/BeamSqlUdf.java | 41 ++
.../extensions/sql/example/BeamSqlExample.java | 2 +-
.../sdk/extensions/sql/impl/BeamSqlCli.java | 65 +++
.../sdk/extensions/sql/impl/BeamSqlEnv.java | 135 ++++++
.../sdk/extensions/sql/impl/package-info.java | 22 +
.../sql/impl/planner/BeamQueryPlanner.java | 9 +-
.../sql/impl/rel/BeamAggregationRel.java | 4 +-
.../extensions/sql/impl/rel/BeamFilterRel.java | 2 +-
.../extensions/sql/impl/rel/BeamIOSinkRel.java | 6 +-
.../sql/impl/rel/BeamIOSourceRel.java | 6 +-
.../sql/impl/rel/BeamIntersectRel.java | 2 +-
.../extensions/sql/impl/rel/BeamJoinRel.java | 4 +-
.../extensions/sql/impl/rel/BeamMinusRel.java | 2 +-
.../extensions/sql/impl/rel/BeamProjectRel.java | 2 +-
.../extensions/sql/impl/rel/BeamRelNode.java | 5 +-
.../sql/impl/rel/BeamSetOperatorRelBase.java | 2 +-
.../extensions/sql/impl/rel/BeamSortRel.java | 4 +-
.../extensions/sql/impl/rel/BeamUnionRel.java | 2 +-
.../extensions/sql/impl/rel/BeamValuesRel.java | 6 +-
.../sql/impl/schema/BaseBeamTable.java | 35 ++
.../extensions/sql/impl/schema/BeamIOType.java | 28 ++
.../sql/impl/schema/BeamPCollectionTable.java | 63 +++
.../sql/impl/schema/BeamSqlTable.java | 54 +++
.../sql/impl/schema/BeamTableUtils.java | 118 +++++
.../impl/schema/kafka/BeamKafkaCSVTable.java | 109 +++++
.../sql/impl/schema/kafka/BeamKafkaTable.java | 109 +++++
.../sql/impl/schema/kafka/package-info.java | 22 +
.../sql/impl/schema/package-info.java | 22 +
.../sql/impl/schema/text/BeamTextCSVTable.java | 70 +++
.../schema/text/BeamTextCSVTableIOReader.java | 58 +++
.../schema/text/BeamTextCSVTableIOWriter.java | 58 +++
.../sql/impl/schema/text/BeamTextTable.java | 41 ++
.../sql/impl/schema/text/package-info.java | 22 +
.../transform/BeamAggregationTransforms.java | 4 +-
.../sql/impl/transform/BeamJoinTransforms.java | 4 +-
.../sql/impl/transform/BeamSqlProjectFn.java | 4 +-
.../extensions/sql/impl/utils/CalciteUtils.java | 2 +-
.../extensions/sql/schema/BaseBeamTable.java | 34 --
.../sdk/extensions/sql/schema/BeamIOType.java | 28 --
.../sql/schema/BeamPCollectionTable.java | 62 ---
.../sql/schema/BeamRecordSqlType.java | 185 --------
.../sql/schema/BeamSqlRecordHelper.java | 217 ---------
.../sdk/extensions/sql/schema/BeamSqlTable.java | 53 ---
.../sdk/extensions/sql/schema/BeamSqlUdf.java | 41 --
.../extensions/sql/schema/BeamTableUtils.java | 117 -----
.../sql/schema/kafka/BeamKafkaCSVTable.java | 109 -----
.../sql/schema/kafka/BeamKafkaTable.java | 109 -----
.../sql/schema/kafka/package-info.java | 22 -
.../sdk/extensions/sql/schema/package-info.java | 22 -
.../sql/schema/text/BeamTextCSVTable.java | 70 ---
.../schema/text/BeamTextCSVTableIOReader.java | 58 ---
.../schema/text/BeamTextCSVTableIOWriter.java | 58 ---
.../sql/schema/text/BeamTextTable.java | 41 --
.../sql/schema/text/package-info.java | 22 -
.../extensions/sql/BeamSqlApiSurfaceTest.java | 12 +-
.../sql/BeamSqlDslAggregationTest.java | 1 -
.../beam/sdk/extensions/sql/BeamSqlDslBase.java | 1 -
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 1 -
.../extensions/sql/BeamSqlDslProjectTest.java | 1 -
.../extensions/sql/BeamSqlDslUdfUdafTest.java | 2 -
.../beam/sdk/extensions/sql/TestUtils.java | 1 -
.../interpreter/BeamSqlFnExecutorTestBase.java | 2 +-
.../extensions/sql/impl/rel/BaseRelTest.java | 34 ++
.../sql/impl/rel/BeamIntersectRelTest.java | 9 +-
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 23 +-
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 25 +-
.../BeamJoinRelUnboundedVsUnboundedTest.java | 19 +-
.../sql/impl/rel/BeamMinusRelTest.java | 9 +-
.../impl/rel/BeamSetOperatorRelBaseTest.java | 9 +-
.../sql/impl/rel/BeamSortRelTest.java | 17 +-
.../sql/impl/rel/BeamUnionRelTest.java | 9 +-
.../sql/impl/rel/BeamValuesRelTest.java | 11 +-
.../sql/impl/schema/BeamSqlRowCoderTest.java | 77 ++++
.../schema/kafka/BeamKafkaCSVTableTest.java | 107 +++++
.../impl/schema/text/BeamTextCSVTableTest.java | 176 +++++++
.../transform/BeamAggregationTransformTest.java | 453 +++++++++++++++++++
.../schema/transform/BeamTransformBaseTest.java | 97 ++++
...mSqlBuiltinFunctionsIntegrationTestBase.java | 2 +-
...amSqlComparisonOperatorsIntegrationTest.java | 2 +-
.../extensions/sql/mock/MockedBoundedTable.java | 4 +-
.../sdk/extensions/sql/mock/MockedTable.java | 4 +-
.../sql/mock/MockedUnboundedTable.java | 4 +-
.../sql/schema/BeamSqlRowCoderTest.java | 76 ----
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 107 -----
.../sql/schema/text/BeamTextCSVTableTest.java | 176 -------
.../transform/BeamAggregationTransformTest.java | 453 -------------------
.../schema/transform/BeamTransformBaseTest.java | 97 ----
92 files changed, 2575 insertions(+), 2545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
new file mode 100644
index 0000000..5269867
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamRecordSqlType extends BeamRecordType {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
+ public List<Integer> fieldTypes;
+
+ protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ }
+
+ private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
+ , List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ this.fieldTypes = fieldTypes;
+ }
+
+ public static BeamRecordSqlType create(List<String> fieldNames,
+ List<Integer> fieldTypes) {
+ if (fieldNames.size() != fieldTypes.size()) {
+ throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
+ }
+ List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
+ for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+ switch (fieldTypes.get(idx)) {
+ case Types.INTEGER:
+ fieldCoders.add(BigEndianIntegerCoder.of());
+ break;
+ case Types.SMALLINT:
+ fieldCoders.add(ShortCoder.of());
+ break;
+ case Types.TINYINT:
+ fieldCoders.add(ByteCoder.of());
+ break;
+ case Types.DOUBLE:
+ fieldCoders.add(DoubleCoder.of());
+ break;
+ case Types.FLOAT:
+ fieldCoders.add(FloatCoder.of());
+ break;
+ case Types.DECIMAL:
+ fieldCoders.add(BigDecimalCoder.of());
+ break;
+ case Types.BIGINT:
+ fieldCoders.add(BigEndianLongCoder.of());
+ break;
+ case Types.VARCHAR:
+ case Types.CHAR:
+ fieldCoders.add(StringUtf8Coder.of());
+ break;
+ case Types.TIME:
+ fieldCoders.add(TimeCoder.of());
+ break;
+ case Types.DATE:
+ case Types.TIMESTAMP:
+ fieldCoders.add(DateCoder.of());
+ break;
+ case Types.BOOLEAN:
+ fieldCoders.add(BooleanCoder.of());
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+ }
+ }
+ return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
+ }
+
+ @Override
+ public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+ if (null == fieldValue) {// no need to do type check for NULL value
+ return;
+ }
+
+ int fieldType = fieldTypes.get(index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+ if (javaClazz == null) {
+ throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
+ }
+
+ public List<Integer> getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public Integer getFieldTypeByIndex(int index){
+ return fieldTypes.get(index);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof BeamRecordSqlType) {
+ BeamRecordSqlType ins = (BeamRecordSqlType) obj;
+ return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "BeamRecordSqlType [fieldNames=" + getFieldNames()
+ + ", fieldTypes=" + fieldTypes + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index bf6a9c0..34355fb 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -17,13 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -94,10 +92,7 @@ public class BeamSql {
* </ul>
*/
public static QueryTransform query(String sqlQuery) {
- return QueryTransform.builder()
- .setSqlEnv(new BeamSqlEnv())
- .setSqlQuery(sqlQuery)
- .build();
+ return new QueryTransform(sqlQuery);
}
/**
@@ -109,10 +104,7 @@ public class BeamSql {
* <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
*/
public static SimpleQueryTransform simpleQuery(String sqlQuery) {
- return SimpleQueryTransform.builder()
- .setSqlEnv(new BeamSqlEnv())
- .setSqlQuery(sqlQuery)
- .build();
+ return new SimpleQueryTransform(sqlQuery);
}
/**
@@ -121,28 +113,22 @@ public class BeamSql {
* <p>The table names in the input {@code PCollectionTuple} are only valid during the current
* query.
*/
- @AutoValue
- public abstract static class QueryTransform extends
+ public static class QueryTransform extends
PTransform<PCollectionTuple, PCollection<BeamRecord>> {
- abstract BeamSqlEnv getSqlEnv();
- abstract String getSqlQuery();
+ private BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ private String sqlQuery;
- static Builder builder() {
- return new AutoValue_BeamSql_QueryTransform.Builder();
- }
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setSqlQuery(String sqlQuery);
- abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
- abstract QueryTransform build();
+ public QueryTransform(String sqlQuery) {
+ this.sqlQuery = sqlQuery;
}
/**
* register a UDF function used in this query.
+ *
+ * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDF in BeamSql.
*/
public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
- getSqlEnv().registerUdf(functionName, clazz);
+ beamSqlEnv.registerUdf(functionName, clazz);
return this;
}
/**
@@ -150,7 +136,7 @@ public class BeamSql {
* Note, {@link SerializableFunction} must have a constructor without arguments.
*/
public QueryTransform withUdf(String functionName, SerializableFunction sfn){
- getSqlEnv().registerUdf(functionName, sfn);
+ beamSqlEnv.registerUdf(functionName, sfn);
return this;
}
@@ -158,7 +144,7 @@ public class BeamSql {
* register a {@link CombineFn} as UDAF function used in this query.
*/
public QueryTransform withUdaf(String functionName, CombineFn combineFn){
- getSqlEnv().registerUdaf(functionName, combineFn);
+ beamSqlEnv.registerUdaf(functionName, combineFn);
return this;
}
@@ -168,13 +154,13 @@ public class BeamSql {
BeamRelNode beamRelNode = null;
try {
- beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+ beamRelNode = beamSqlEnv.getPlanner().convertToBeamRel(sqlQuery);
} catch (ValidationException | RelConversionException | SqlParseException e) {
throw new IllegalStateException(e);
}
try {
- return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+ return beamRelNode.buildBeamPipeline(input, beamSqlEnv);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -186,7 +172,7 @@ public class BeamSql {
PCollection<BeamRecord> sourceStream = (PCollection<BeamRecord>) input.get(sourceTag);
BeamRecordCoder sourceCoder = (BeamRecordCoder) sourceStream.getCoder();
- getSqlEnv().registerTable(sourceTag.getId(),
+ beamSqlEnv.registerTable(sourceTag.getId(),
new BeamPCollectionTable(sourceStream,
(BeamRecordSqlType) sourceCoder.getRecordType()));
}
@@ -197,53 +183,47 @@ public class BeamSql {
* A {@link PTransform} representing an execution plan for a SQL query referencing
* a single table.
*/
- @AutoValue
- public abstract static class SimpleQueryTransform
+ public static class SimpleQueryTransform
extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
- abstract BeamSqlEnv getSqlEnv();
- abstract String getSqlQuery();
+ private QueryTransform delegate;
- static Builder builder() {
- return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+ public SimpleQueryTransform(String sqlQuery) {
+ this.delegate = new QueryTransform(sqlQuery);
}
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setSqlQuery(String sqlQuery);
- abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
- abstract SimpleQueryTransform build();
+ /**
+ * register a UDF function used in this query.
+ *
+ * <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDAF in BeamSql.
+ */
+ public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ delegate.withUdf(functionName, clazz);
+ return this;
}
/**
- * register a UDF function used in this query.
+ * register {@link SerializableFunction} as a UDF function used in this query.
+ * Note, {@link SerializableFunction} must have a constructor without arguments.
*/
- public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
- getSqlEnv().registerUdf(functionName, clazz);
- return this;
- }
- /**
- * register {@link SerializableFunction} as a UDF function used in this query.
- * Note, {@link SerializableFunction} must have a constructor without arguments.
- */
- public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
- getSqlEnv().registerUdf(functionName, sfn);
- return this;
- }
+ public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){
+ delegate.withUdf(functionName, sfn);
+ return this;
+ }
- /**
- * register a {@link CombineFn} as UDAF function used in this query.
- */
- public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){
- getSqlEnv().registerUdaf(functionName, combineFn);
- return this;
- }
+ /**
+ * register a {@link CombineFn} as UDAF function used in this query.
+ */
+ public SimpleQueryTransform withUdaf(String functionName, CombineFn combineFn){
+ delegate.withUdaf(functionName, combineFn);
+ return this;
+ }
private void validateQuery() {
SqlNode sqlNode;
try {
- sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
- getSqlEnv().planner.getPlanner().close();
+ sqlNode = delegate.beamSqlEnv.getPlanner().parseQuery(delegate.sqlQuery);
+ delegate.beamSqlEnv.getPlanner().getPlanner().close();
} catch (SqlParseException e) {
throw new IllegalStateException(e);
}
@@ -264,10 +244,7 @@ public class BeamSql {
public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) {
validateQuery();
return PCollectionTuple.of(new TupleTag<BeamRecord>(PCOLLECTION_TABLE_NAME), input)
- .apply(QueryTransform.builder()
- .setSqlEnv(getSqlEnv())
- .setSqlQuery(getSqlQuery())
- .build());
+ .apply(delegate);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
deleted file mode 100644
index a43808e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptUtil;
-
-/**
- * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
- */
-@Experimental
-public class BeamSqlCli {
- /**
- * Returns a human readable representation of the query execution plan.
- */
- public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
- BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
- String beamPlan = RelOptUtil.toString(exeTree);
- return beamPlan;
- }
-
- /**
- * compile SQL, and return a {@link Pipeline}.
- */
- public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
- throws Exception{
- PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
- .as(PipelineOptions.class); // FlinkPipelineOptions.class
- options.setJobName("BeamPlanCreator");
- Pipeline pipeline = Pipeline.create(options);
-
- return compilePipeline(sqlStatement, pipeline, sqlEnv);
- }
-
- /**
- * compile SQL, and return a {@link Pipeline}.
- */
- public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
- BeamSqlEnv sqlEnv) throws Exception{
- PCollection<BeamRecord> resultStream =
- sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
- return resultStream;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
deleted file mode 100644
index 79f2b32..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.tools.Frameworks;
-
-/**
- * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
- *
- * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
- * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
- */
-public class BeamSqlEnv implements Serializable{
- transient SchemaPlus schema;
- transient BeamQueryPlanner planner;
-
- public BeamSqlEnv() {
- schema = Frameworks.createRootSchema(true);
- planner = new BeamQueryPlanner(schema);
- }
-
- /**
- * Register a UDF function which can be used in SQL expression.
- */
- public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
- schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
- }
-
- /**
- * register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
- * Note, {@link SerializableFunction} must have a constructor without arguments.
- */
- public void registerUdf(String functionName, SerializableFunction sfn) {
- schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
- }
-
- /**
- * Register a {@link CombineFn} as UDAF function which can be used in GROUP-BY expression.
- */
- public void registerUdaf(String functionName, CombineFn combineFn) {
- schema.add(functionName, new UdafImpl(combineFn));
- }
-
- /**
- * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
- *
- */
- public void registerTable(String tableName, BaseBeamTable table) {
- schema.add(tableName, new BeamCalciteTable(table.getRowType()));
- planner.getSourceTables().put(tableName, table);
- }
-
- /**
- * Find {@link BaseBeamTable} by table name.
- */
- public BaseBeamTable findTable(String tableName){
- return planner.getSourceTables().get(tableName);
- }
-
- private static class BeamCalciteTable implements ScannableTable, Serializable {
- private BeamRecordSqlType beamSqlRowType;
- public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
- this.beamSqlRowType = beamSqlRowType;
- }
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
- .apply(BeamQueryPlanner.TYPE_FACTORY);
- }
-
- @Override
- public Enumerable<Object[]> scan(DataContext root) {
- // not used as Beam SQL uses its own execution engine
- return null;
- }
-
- /**
- * Not used {@link Statistic} to optimize the plan.
- */
- @Override
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
- }
-
- /**
- * all sources are treated as TABLE in Beam SQL.
- */
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
new file mode 100644
index 0000000..870165d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * A {@link Coder} encodes {@link BeamRecord}.
+ */
+@Experimental
+public class BeamSqlRecordHelper {
+
+ public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
+ return (BeamRecordSqlType) record.getDataType();
+ }
+
+ /**
+ * {@link Coder} for Java type {@link Short}.
+ */
+ public static class ShortCoder extends CustomCoder<Short> {
+ private static final ShortCoder INSTANCE = new ShortCoder();
+
+ public static ShortCoder of() {
+ return INSTANCE;
+ }
+
+ private ShortCoder() {
+ }
+
+ @Override
+ public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
+ new DataOutputStream(outStream).writeShort(value);
+ }
+
+ @Override
+ public Short decode(InputStream inStream) throws CoderException, IOException {
+ return new DataInputStream(inStream).readShort();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
+ */
+ public static class FloatCoder extends CustomCoder<Float> {
+ private static final FloatCoder INSTANCE = new FloatCoder();
+ private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+ public static FloatCoder of() {
+ return INSTANCE;
+ }
+
+ private FloatCoder() {
+ }
+
+ @Override
+ public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
+ CODER.encode(new BigDecimal(value), outStream);
+ }
+
+ @Override
+ public Float decode(InputStream inStream) throws CoderException, IOException {
+ return CODER.decode(inStream).floatValue();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
+ */
+ public static class DoubleCoder extends CustomCoder<Double> {
+ private static final DoubleCoder INSTANCE = new DoubleCoder();
+ private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+ public static DoubleCoder of() {
+ return INSTANCE;
+ }
+
+ private DoubleCoder() {
+ }
+
+ @Override
+ public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
+ CODER.encode(new BigDecimal(value), outStream);
+ }
+
+ @Override
+ public Double decode(InputStream inStream) throws CoderException, IOException {
+ return CODER.decode(inStream).doubleValue();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+
+ /**
+ * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
+ */
+ public static class TimeCoder extends CustomCoder<GregorianCalendar> {
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final TimeCoder INSTANCE = new TimeCoder();
+
+ public static TimeCoder of() {
+ return INSTANCE;
+ }
+
+ private TimeCoder() {
+ }
+
+ @Override
+ public void encode(GregorianCalendar value, OutputStream outStream)
+ throws CoderException, IOException {
+ longCoder.encode(value.getTime().getTime(), outStream);
+ }
+
+ @Override
+ public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date(longCoder.decode(inStream)));
+ return calendar;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
+ */
+ public static class DateCoder extends CustomCoder<Date> {
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DateCoder INSTANCE = new DateCoder();
+
+ public static DateCoder of() {
+ return INSTANCE;
+ }
+
+ private DateCoder() {
+ }
+
+ @Override
+ public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
+ longCoder.encode(value.getTime(), outStream);
+ }
+
+ @Override
+ public Date decode(InputStream inStream) throws CoderException, IOException {
+ return new Date(longCoder.decode(inStream));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+
+ /**
+ * {@link Coder} for Java type {@link Boolean}.
+ */
+ public static class BooleanCoder extends CustomCoder<Boolean> {
+ private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+ public static BooleanCoder of() {
+ return INSTANCE;
+ }
+
+ private BooleanCoder() {
+ }
+
+ @Override
+ public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
+ new DataOutputStream(outStream).writeBoolean(value);
+ }
+
+ @Override
+ public Boolean decode(InputStream inStream) throws CoderException, IOException {
+ return new DataInputStream(inStream).readBoolean();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
new file mode 100644
index 0000000..d4828e7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ * public String eval(
+ * @Parameter(name = "s") String s,
+ * @Parameter(name = "n", optional = true) Integer n) {
+ * return s.substring(0, n == null ? 1 : n);
+ * }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+ String UDF_METHOD = "eval";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 91251cf..0c5dae1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -21,8 +21,8 @@ import java.sql.Types;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
new file mode 100644
index 0000000..5c7d920
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+ /**
+ * Returns a human readable representation of the query execution plan.
+ */
+ public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+ BeamRelNode exeTree = sqlEnv.getPlanner().convertToBeamRel(sqlString);
+ String beamPlan = RelOptUtil.toString(exeTree);
+ return beamPlan;
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamRecord> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+ throws Exception{
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+ .as(PipelineOptions.class);
+ options.setJobName("BeamPlanCreator");
+ Pipeline pipeline = Pipeline.create(options);
+
+ return compilePipeline(sqlStatement, pipeline, sqlEnv);
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamRecord> compilePipeline(String sqlStatement, Pipeline basePipeline,
+ BeamSqlEnv sqlEnv) throws Exception{
+ PCollection<BeamRecord> resultStream = sqlEnv.getPlanner()
+ .compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+ return resultStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
new file mode 100644
index 0000000..fcc9079
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
+ * {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions,
+ * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+ transient SchemaPlus schema;
+ transient BeamQueryPlanner planner;
+
+ public BeamSqlEnv() {
+ schema = Frameworks.createRootSchema(true);
+ planner = new BeamQueryPlanner(schema);
+ }
+
+ /**
+ * Register a UDF function which can be used in SQL expression.
+ */
+ public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+ }
+
+ /**
+ * Register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
+ * Note, {@link SerializableFunction} must have a constructor without arguments.
+ */
+ public void registerUdf(String functionName, SerializableFunction sfn) {
+ schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply"));
+ }
+
+ /**
+ * Register a UDAF function which can be used in GROUP-BY expression.
+ * See {@link org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+ */
+ public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
+ schema.add(functionName, new UdafImpl(combineFn));
+ }
+
+ /**
+ * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+ *
+ */
+ public void registerTable(String tableName, BeamSqlTable table) {
+ schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+ planner.getSourceTables().put(tableName, table);
+ }
+
+ /**
+ * Find {@link BaseBeamTable} by table name.
+ */
+ public BeamSqlTable findTable(String tableName){
+ return planner.getSourceTables().get(tableName);
+ }
+
+ private static class BeamCalciteTable implements ScannableTable, Serializable {
+ private BeamRecordSqlType beamSqlRowType;
+ public BeamCalciteTable(BeamRecordSqlType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+ .apply(BeamQueryPlanner.TYPE_FACTORY);
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ // not used as Beam SQL uses its own execution engine
+ return null;
+ }
+
+ /**
+ * Not used {@link Statistic} to optimize the plan.
+ */
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ /**
+ * all sources are treated as TABLE in Beam SQL.
+ */
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ }
+
+ public BeamQueryPlanner getPlanner() {
+ return planner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
new file mode 100644
index 0000000..de237d6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation classes of BeamSql.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index b421bc3..410c783 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -23,10 +23,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -67,7 +68,7 @@ public class BeamQueryPlanner {
private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
protected final Planner planner;
- private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
+ private Map<String, BeamSqlTable> sourceTables = new HashMap<>();
public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
@@ -156,7 +157,7 @@ public class BeamQueryPlanner {
return planner.validate(sqlNode);
}
- public Map<String, BaseBeamTable> getSourceTables() {
+ public Map<String, BeamSqlTable> getSourceTables() {
return sourceTables;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 4b557f9..e49e79c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -21,10 +21,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index 8fe5be4..9d36a47 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index 1e3eb4c..7bb08c2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import com.google.common.base.Joiner;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -65,7 +65,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
- BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
+ BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
upstream.apply(stageName, targetTable.buildIOWriter());
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 254f990..1e4f506 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import com.google.common.base.Joiner;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -53,7 +53,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
return sourceStream;
} else {
//If not, the source PColection is provided with BaseBeamTable.buildIOReader().
- BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
+ BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
return sourceTable.buildIOReader(inputPCollections.getPipeline())
.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index 5919329..1ffb636 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 5ac9575..cc26aa6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -25,10 +25,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index b55252a..6f5dff2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index b1ff629..501feb3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index b8b4293..9e8d46d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -33,6 +33,7 @@ public interface BeamRelNode extends RelNode {
* {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
* algorithm.
*/
- PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+ PCollection<BeamRecord> buildBeamPipeline(
+ PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
throws Exception;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index f9cbf4f..a1f3e2b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 80f3c97..d658638 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index 63ebdf3..85d676e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index c4caff3..d684294 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -21,10 +21,10 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
new file mode 100644
index 0000000..73e0863
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BaseBeamTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
+ protected BeamRecordSqlType beamSqlRowType;
+ public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+
+ @Override public BeamRecordSqlType getRowType() {
+ return beamSqlRowType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
new file mode 100644
index 0000000..5ced467
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+ BOUNDED, UNBOUNDED;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..31e60e0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+ private BeamIOType ioType;
+ private transient PCollection<BeamRecord> upstream;
+
+ protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamPCollectionTable(PCollection<BeamRecord> upstream,
+ BeamRecordSqlType beamSqlRowType){
+ this(beamSqlRowType);
+ ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+ ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+ this.upstream = upstream;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return ioType;
+ }
+
+ @Override
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+ return upstream;
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+ throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
new file mode 100644
index 0000000..46fba59
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlTable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+ /**
+ * In Beam SQL, there's no difference between a batch query and a streaming
+ * query. {@link BeamIOType} is used to validate the sources.
+ */
+ BeamIOType getSourceType();
+
+ /**
+ * create a {@code PCollection<BeamSqlRow>} from source.
+ *
+ */
+ PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
+
+ /**
+ * create a {@code IO.write()} instance to write to target.
+ *
+ */
+ PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
+
+ /**
+ * Get the schema info of the table.
+ */
+ BeamRecordSqlType getRowType();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
new file mode 100644
index 0000000..6f7f09b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+ public static BeamRecord csvLine2BeamSqlRow(
+ CSVFormat csvFormat,
+ String line,
+ BeamRecordSqlType beamRecordSqlType) {
+ List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
+ try (StringReader reader = new StringReader(line)) {
+ CSVParser parser = csvFormat.parse(reader);
+ CSVRecord rawRecord = parser.getRecords().get(0);
+
+ if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
+ throw new IllegalArgumentException(String.format(
+ "Expect %d fields, but actually %d",
+ beamRecordSqlType.getFieldCount(), rawRecord.size()
+ ));
+ } else {
+ for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
+ String raw = rawRecord.get(idx);
+ fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("decodeRecord failed!", e);
+ }
+ return new BeamRecord(beamRecordSqlType, fieldsValue);
+ }
+
+ public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
+ StringWriter writer = new StringWriter();
+ try (CSVPrinter printer = csvFormat.print(writer)) {
+ for (int i = 0; i < row.getFieldCount(); i++) {
+ printer.print(row.getFieldValue(i).toString());
+ }
+ printer.println();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("encodeRecord failed!", e);
+ }
+ return writer.toString();
+ }
+
+ public static Object autoCastField(int fieldType, Object rawObj) {
+ if (rawObj == null) {
+ return null;
+ }
+
+ SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
+ // auto-casting for numberics
+ if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+ || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+ String raw = rawObj.toString();
+ switch (columnType) {
+ case TINYINT:
+ return Byte.valueOf(raw);
+ case SMALLINT:
+ return Short.valueOf(raw);
+ case INTEGER:
+ return Integer.valueOf(raw);
+ case BIGINT:
+ return Long.valueOf(raw);
+ case FLOAT:
+ return Float.valueOf(raw);
+ case DOUBLE:
+ return Double.valueOf(raw);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Column type %s is not supported yet!", columnType));
+ }
+ } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+ // convert NlsString to String
+ if (rawObj instanceof NlsString) {
+ return ((NlsString) rawObj).getValue();
+ } else {
+ return rawObj;
+ }
+ } else {
+ return rawObj;
+ }
+ }
+}