You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:59 UTC
[36/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
new file mode 100644
index 0000000..d64ae41
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+ .apply(...);
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+ .apply(...);
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(
+ BeamSql.simpleQuery(sql1)
+ .withUdf("MY_FUNC", MY_FUNC.class, "FUNC"));
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+ new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+ .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+ .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+ * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
+ * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+ * {@code PCollection<BeamSqlRow>}, each representing an input table.
+ *
+ * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+ * referenced within the query.
+ */
+ public static QueryTransform query(String sqlQuery) {
+ return QueryTransform.builder()
+ .setSqlEnv(new BeamSqlEnv())
+ .setSqlQuery(sqlQuery)
+ .build();
+ }
+
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>This is a simplified form of {@link #query(String)} where the query must reference
+ * a single input table.
+ *
+ * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
+ */
+ public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception {
+ return SimpleQueryTransform.builder()
+ .setSqlEnv(new BeamSqlEnv())
+ .setSqlQuery(sqlQuery)
+ .build();
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query.
+ */
+ @AutoValue
+ public abstract static class QueryTransform extends
+ PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+ abstract BeamSqlEnv getSqlEnv();
+ abstract String getSqlQuery();
+
+ static Builder builder() {
+ return new AutoValue_BeamSql_QueryTransform.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSqlQuery(String sqlQuery);
+ abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+ abstract QueryTransform build();
+ }
+
+ /**
+ * register a UDF function used in this query.
+ */
+ public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
+ return this;
+ }
+
+ /**
+ * register a UDAF function used in this query.
+ */
+ public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+ getSqlEnv().registerUdaf(functionName, clazz);
+ return this;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+ registerTables(input);
+
+ BeamRelNode beamRelNode = null;
+ try {
+ beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+ } catch (ValidationException | RelConversionException | SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ try {
+ return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ //register tables, related with input PCollections.
+ private void registerTables(PCollectionTuple input){
+ for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+ PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+ BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+ getSqlEnv().registerTable(sourceTag.getId(),
+ new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+ }
+ }
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query referencing
+ * a single table.
+ */
+ @AutoValue
+ public abstract static class SimpleQueryTransform
+ extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+ private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
+ abstract BeamSqlEnv getSqlEnv();
+ abstract String getSqlQuery();
+
+ static Builder builder() {
+ return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSqlQuery(String sqlQuery);
+ abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+ abstract SimpleQueryTransform build();
+ }
+
+ /**
+ * register a UDF function used in this query.
+ */
+ public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
+ return this;
+ }
+
+ /**
+ * register a UDAF function used in this query.
+ */
+ public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+ getSqlEnv().registerUdaf(functionName, clazz);
+ return this;
+ }
+
+ private void validateQuery() {
+ SqlNode sqlNode;
+ try {
+ sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
+ getSqlEnv().planner.getPlanner().close();
+ } catch (SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ if (sqlNode instanceof SqlSelect) {
+ SqlSelect select = (SqlSelect) sqlNode;
+ String tableName = select.getFrom().toString();
+ if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
+ throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Sql operation: " + sqlNode.toString() + " is not supported!");
+ }
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+ validateQuery();
+ return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+ .apply(QueryTransform.builder()
+ .setSqlEnv(getSqlEnv())
+ .setSqlQuery(getSqlQuery())
+ .build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
new file mode 100644
index 0000000..714e102
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+ /**
+ * Returns a human readable representation of the query execution plan.
+ */
+ public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+ BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
+ String beamPlan = RelOptUtil.toString(exeTree);
+ return beamPlan;
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+ throws Exception{
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+ .as(PipelineOptions.class); // FlinkPipelineOptions.class
+ options.setJobName("BeamPlanCreator");
+ Pipeline pipeline = Pipeline.create(options);
+
+ return compilePipeline(sqlStatement, pipeline, sqlEnv);
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+ , BeamSqlEnv sqlEnv) throws Exception{
+ PCollection<BeamSqlRow> resultStream =
+ sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+ return resultStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..ca73b13
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+ transient SchemaPlus schema;
+ transient BeamQueryPlanner planner;
+
+ public BeamSqlEnv() {
+ schema = Frameworks.createRootSchema(true);
+ planner = new BeamQueryPlanner(schema);
+ }
+
+ /**
+ * Register a UDF function which can be used in SQL expression.
+ */
+ public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+ }
+
+ /**
+ * Register a UDAF function which can be used in GROUP-BY expression.
+ * See {@link BeamSqlUdaf} on how to implement a UDAF.
+ */
+ public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) {
+ schema.add(functionName, AggregateFunctionImpl.create(clazz));
+ }
+
+ /**
+ * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+ *
+ */
+ public void registerTable(String tableName, BaseBeamTable table) {
+ schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+ planner.getSourceTables().put(tableName, table);
+ }
+
+ /**
+ * Find {@link BaseBeamTable} by table name.
+ */
+ public BaseBeamTable findTable(String tableName){
+ return planner.getSourceTables().get(tableName);
+ }
+
+ private static class BeamCalciteTable implements ScannableTable, Serializable {
+ private BeamSqlRowType beamSqlRowType;
+ public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+ .apply(BeamQueryPlanner.TYPE_FACTORY);
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ // not used as Beam SQL uses its own execution engine
+ return null;
+ }
+
+ /**
+ * Not used {@link Statistic} to optimize the plan.
+ */
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ /**
+ * all sources are treated as TABLE in Beam SQL.
+ */
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
new file mode 100644
index 0000000..21e02a7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
+ *
+ * <p>Run the example with
+ * <pre>
+ * mvn -pl dsls/sql compile exec:java \
+ * -Dexec.mainClass=BeamSqlExample \
+ * -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * </pre>
+ *
+ */
+class BeamSqlExample {
+ public static void main(String[] args) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+ Pipeline p = Pipeline.create(options);
+
+ //define the input row format
+ List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+ List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+ BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
+ BeamSqlRow row = new BeamSqlRow(type);
+ row.addField(0, 1);
+ row.addField(1, "row");
+ row.addField(2, 1.0);
+
+ //create a source PCollection with Create.of();
+ PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
+ .withCoder(new BeamSqlRowCoder(type)));
+
+ //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+ PCollection<BeamSqlRow> outputStream = inputTable.apply(
+ BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+
+ //print the output record of case 1;
+ outputStream.apply("log_result",
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ public Void apply(BeamSqlRow input) {
+ System.out.println("PCOLLECTION: " + input);
+ return null;
+ }
+ }));
+
+ //Case 2. run the query with BeamSql.query over result PCollection of case 1.
+ PCollection<BeamSqlRow> outputStream2 =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+ .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+
+ //print the output record of case 2;
+ outputStream2.apply("log_result",
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ @Override
+ public Void apply(BeamSqlRow input) {
+ System.out.println("TABLE_B: " + input);
+ return null;
+ }
+ }));
+
+ p.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
new file mode 100644
index 0000000..f156917
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * examples on how to use BeamSQL.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.example;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..28f83e4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+ /**
+ * invoked before data processing.
+ */
+ void prepare();
+
+ /**
+ * apply transformation to input record {@link BeamSqlRow}.
+ *
+ */
+ List<Object> execute(BeamSqlRow inputRow);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..3084cd5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+ protected List<BeamSqlExpression> exps;
+
+ public BeamSqlFnExecutor(BeamRelNode relNode) {
+ this.exps = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ BeamFilterRel filterNode = (BeamFilterRel) relNode;
+ RexNode condition = filterNode.getCondition();
+ exps.add(buildExpression(condition));
+ } else if (relNode instanceof BeamProjectRel) {
+ BeamProjectRel projectNode = (BeamProjectRel) relNode;
+ List<RexNode> projects = projectNode.getProjects();
+ for (RexNode rexNode : projects) {
+ exps.add(buildExpression(rexNode));
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", relNode.getClass().toString()));
+ }
+ }
+
+ /**
+ * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+ * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+ */
+ static BeamSqlExpression buildExpression(RexNode rexNode) {
+ BeamSqlExpression ret = null;
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral node = (RexLiteral) rexNode;
+ SqlTypeName type = node.getTypeName();
+ Object value = node.getValue();
+
+ if (SqlTypeName.CHAR_TYPES.contains(type)
+ && node.getValue() instanceof NlsString) {
+ // NlsString is not serializable, we need to convert
+ // it to string explicitly.
+ return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+ } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+ // does this actually make sense?
+ // Calcite actually treat Calendar as the java type of Date Literal
+ return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+ } else {
+ // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+ // e.g. sql: "select 1"
+ // here the literal 1 will be parsed as a RexLiteral where:
+ // node.getType().getSqlTypeName() = INTEGER (the display type)
+ // node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+ // So we need to do a convert here.
+ // check RexBuilder#makeLiteral for more information.
+ SqlTypeName realType = node.getType().getSqlTypeName();
+ Object realValue = value;
+ if (type == SqlTypeName.DECIMAL) {
+ BigDecimal rawValue = (BigDecimal) value;
+ switch (realType) {
+ case TINYINT:
+ realValue = (byte) rawValue.intValue();
+ break;
+ case SMALLINT:
+ realValue = (short) rawValue.intValue();
+ break;
+ case INTEGER:
+ realValue = rawValue.intValue();
+ break;
+ case BIGINT:
+ realValue = rawValue.longValue();
+ break;
+ case DECIMAL:
+ realValue = rawValue;
+ break;
+ default:
+ throw new IllegalStateException("type/realType mismatch: "
+ + type + " VS " + realType);
+ }
+ } else if (type == SqlTypeName.DOUBLE) {
+ Double rawValue = (Double) value;
+ if (realType == SqlTypeName.FLOAT) {
+ realValue = rawValue.floatValue();
+ }
+ }
+ return BeamSqlPrimitive.of(realType, realValue);
+ }
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef node = (RexInputRef) rexNode;
+ ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+ } else if (rexNode instanceof RexCall) {
+ RexCall node = (RexCall) rexNode;
+ String opName = node.op.getName();
+ List<BeamSqlExpression> subExps = new ArrayList<>();
+ for (RexNode subNode : node.getOperands()) {
+ subExps.add(buildExpression(subNode));
+ }
+ switch (opName) {
+ // logical operators
+ case "AND":
+ ret = new BeamSqlAndExpression(subExps);
+ break;
+ case "OR":
+ ret = new BeamSqlOrExpression(subExps);
+ break;
+ case "NOT":
+ ret = new BeamSqlNotExpression(subExps);
+ break;
+ case "=":
+ ret = new BeamSqlEqualsExpression(subExps);
+ break;
+ case "<>":
+ ret = new BeamSqlNotEqualsExpression(subExps);
+ break;
+ case ">":
+ ret = new BeamSqlGreaterThanExpression(subExps);
+ break;
+ case ">=":
+ ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+ break;
+ case "<":
+ ret = new BeamSqlLessThanExpression(subExps);
+ break;
+ case "<=":
+ ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+ break;
+
+ // arithmetic operators
+ case "+":
+ ret = new BeamSqlPlusExpression(subExps);
+ break;
+ case "-":
+ ret = new BeamSqlMinusExpression(subExps);
+ break;
+ case "*":
+ ret = new BeamSqlMultiplyExpression(subExps);
+ break;
+ case "/":
+ case "/INT":
+ ret = new BeamSqlDivideExpression(subExps);
+ break;
+ case "MOD":
+ ret = new BeamSqlModExpression(subExps);
+ break;
+
+ case "ABS":
+ ret = new BeamSqlAbsExpression(subExps);
+ break;
+ case "ROUND":
+ ret = new BeamSqlRoundExpression(subExps);
+ break;
+ case "LN":
+ ret = new BeamSqlLnExpression(subExps);
+ break;
+ case "LOG10":
+ ret = new BeamSqlLogExpression(subExps);
+ break;
+ case "EXP":
+ ret = new BeamSqlExpExpression(subExps);
+ break;
+ case "ACOS":
+ ret = new BeamSqlAcosExpression(subExps);
+ break;
+ case "ASIN":
+ ret = new BeamSqlAsinExpression(subExps);
+ break;
+ case "ATAN":
+ ret = new BeamSqlAtanExpression(subExps);
+ break;
+ case "COT":
+ ret = new BeamSqlCotExpression(subExps);
+ break;
+ case "DEGREES":
+ ret = new BeamSqlDegreesExpression(subExps);
+ break;
+ case "RADIANS":
+ ret = new BeamSqlRadiansExpression(subExps);
+ break;
+ case "COS":
+ ret = new BeamSqlCosExpression(subExps);
+ break;
+ case "SIN":
+ ret = new BeamSqlSinExpression(subExps);
+ break;
+ case "TAN":
+ ret = new BeamSqlTanExpression(subExps);
+ break;
+ case "SIGN":
+ ret = new BeamSqlSignExpression(subExps);
+ break;
+ case "POWER":
+ ret = new BeamSqlPowerExpression(subExps);
+ break;
+ case "PI":
+ ret = new BeamSqlPiExpression();
+ break;
+ case "ATAN2":
+ ret = new BeamSqlAtan2Expression(subExps);
+ break;
+ case "TRUNCATE":
+ ret = new BeamSqlTruncateExpression(subExps);
+ break;
+ case "RAND":
+ ret = new BeamSqlRandExpression(subExps);
+ break;
+ case "RAND_INTEGER":
+ ret = new BeamSqlRandIntegerExpression(subExps);
+ break;
+
+ // string operators
+ case "||":
+ ret = new BeamSqlConcatExpression(subExps);
+ break;
+ case "POSITION":
+ ret = new BeamSqlPositionExpression(subExps);
+ break;
+ case "CHAR_LENGTH":
+ case "CHARACTER_LENGTH":
+ ret = new BeamSqlCharLengthExpression(subExps);
+ break;
+ case "UPPER":
+ ret = new BeamSqlUpperExpression(subExps);
+ break;
+ case "LOWER":
+ ret = new BeamSqlLowerExpression(subExps);
+ break;
+ case "TRIM":
+ ret = new BeamSqlTrimExpression(subExps);
+ break;
+ case "SUBSTRING":
+ ret = new BeamSqlSubstringExpression(subExps);
+ break;
+ case "OVERLAY":
+ ret = new BeamSqlOverlayExpression(subExps);
+ break;
+ case "INITCAP":
+ ret = new BeamSqlInitCapExpression(subExps);
+ break;
+
+ // date functions
+ case "Reinterpret":
+ return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+ case "CEIL":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlCeilExpression(subExps);
+ } else {
+ return new BeamSqlDateCeilExpression(subExps);
+ }
+ case "FLOOR":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlFloorExpression(subExps);
+ } else {
+ return new BeamSqlDateFloorExpression(subExps);
+ }
+ case "EXTRACT_DATE":
+ case "EXTRACT":
+ return new BeamSqlExtractExpression(subExps);
+
+ case "LOCALTIME":
+ case "CURRENT_TIME":
+ return new BeamSqlCurrentTimeExpression(subExps);
+
+ case "CURRENT_TIMESTAMP":
+ case "LOCALTIMESTAMP":
+ return new BeamSqlCurrentTimestampExpression(subExps);
+
+ case "CURRENT_DATE":
+ return new BeamSqlCurrentDateExpression();
+
+
+ case "CASE":
+ ret = new BeamSqlCaseExpression(subExps);
+ break;
+ case "CAST":
+ ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+ break;
+
+ case "IS NULL":
+ ret = new BeamSqlIsNullExpression(subExps.get(0));
+ break;
+ case "IS NOT NULL":
+ ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+ break;
+
+ case "HOP":
+ case "TUMBLE":
+ case "SESSION":
+ ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+ break;
+ case "HOP_START":
+ case "TUMBLE_START":
+ case "SESSION_START":
+ ret = new BeamSqlWindowStartExpression();
+ break;
+ case "HOP_END":
+ case "TUMBLE_END":
+ case "SESSION_END":
+ ret = new BeamSqlWindowEndExpression();
+ break;
+ default:
+ //handle UDF
+ if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+ SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+ ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+ ret = new BeamSqlUdfExpression(fn.method, subExps,
+ ((RexCall) rexNode).type.getSqlTypeName());
+ } else {
+ throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", rexNode.getClass().toString()));
+ }
+
+ if (ret != null && !ret.accept()) {
+ throw new IllegalStateException(ret.getClass().getSimpleName()
+ + " does not accept the operands.(" + rexNode + ")");
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List<Object> execute(BeamSqlRow inputRow) {
+ List<Object> results = new ArrayList<>();
+ for (BeamSqlExpression exp : exps) {
+ results.add(exp.evaluate(inputRow).getValue());
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..bfbb33e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+ public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+ // the return type of CASE is the type of the `else` condition
+ super(operands, operands.get(operands.size() - 1).getOutputType());
+ }
+
+ @Override public boolean accept() {
+ // `when`-`then` pair + `else`
+ if (operands.size() % 2 != 1) {
+ return false;
+ }
+
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opType(i) != SqlTypeName.BOOLEAN) {
+ return false;
+ } else if (opType(i + 1) != outputType) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opValueEvaluated(i, inputRow)) {
+ return BeamSqlPrimitive.of(
+ outputType,
+ opValueEvaluated(i + 1, inputRow)
+ );
+ }
+ }
+ return BeamSqlPrimitive.of(outputType,
+ opValueEvaluated(operands.size() - 1, inputRow));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..08abcc6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+ private static final int index = 0;
+ private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+ private static final String outputDateFormat = "yyyy-MM-dd";
+ /**
+ * Date and Timestamp formats used to parse
+ * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+ */
+ private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+ .append(null/*printer*/, new DateTimeParser[] {
+ // date formats
+ DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+ DateTimeFormat.forPattern("yyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+ // datetime formats
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+ .withPivotYear(2020);
+
+ public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+ super(operands, castType);
+ }
+
+ @Override
+ public boolean accept() {
+ return numberOfOperands() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ SqlTypeName castOutputType = getOutputType();
+ switch (castOutputType) {
+ case INTEGER:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+ case DOUBLE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+ case SMALLINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+ case TINYINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+ case BIGINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+ case FLOAT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+ case CHAR:
+ case VARCHAR:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+ case DATE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+ case TIMESTAMP:
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+ }
+ throw new UnsupportedOperationException(
+ String.format("Cast to type %s not supported", castOutputType));
+ }
+
+ private Date toDate(Object inputDate, String outputFormat) {
+ try {
+ return Date
+ .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+ }
+ }
+
+ private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+ try {
+ return Timestamp.valueOf(
+ dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+ .roundCeilingCopy().toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..cb8baac
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable {
+ protected List<BeamSqlExpression> operands;
+ protected SqlTypeName outputType;
+
+ protected BeamSqlExpression(){}
+
+ public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ this.operands = operands;
+ this.outputType = outputType;
+ }
+
+ public BeamSqlExpression op(int idx) {
+ return operands.get(idx);
+ }
+
+ public SqlTypeName opType(int idx) {
+ return op(idx).getOutputType();
+ }
+
+ public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+ return (T) op(idx).evaluate(row).getValue();
+ }
+
+ /**
+ * assertion to make sure the input and output are supported in this expression.
+ */
+ public abstract boolean accept();
+
+ /**
+ * Apply input record {@link BeamSqlRow} to this expression,
+ * the output value is wrapped with {@link BeamSqlPrimitive}.
+ */
+ public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+
+ public List<BeamSqlExpression> getOperands() {
+ return operands;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+
+ public int numberOfOperands() {
+ return operands.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..7ba4a46
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
+ private int inputRef;
+
+ public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+ super(null, sqlTypeName);
+ this.inputRef = inputRef;
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..6a8216b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
+ private T value;
+
+ private BeamSqlPrimitive() {
+ }
+
+ private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ /**
+ * A builder function to create from Type and value directly.
+ */
+ public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+ BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
+ exp.outputType = outputType;
+ exp.value = value;
+ if (!exp.accept()) {
+ throw new IllegalArgumentException(
+ String.format("value [%s] doesn't match type [%s].", value, outputType));
+ }
+ return exp;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public long getLong() {
+ return (Long) getValue();
+ }
+
+ public double getDouble() {
+ return (Double) getValue();
+ }
+
+ public float getFloat() {
+ return (Float) getValue();
+ }
+
+ public int getInteger() {
+ return (Integer) getValue();
+ }
+
+ public short getShort() {
+ return (Short) getValue();
+ }
+
+ public byte getByte() {
+ return (Byte) getValue();
+ }
+ public boolean getBoolean() {
+ return (Boolean) getValue();
+ }
+
+ public String getString() {
+ return (String) getValue();
+ }
+
+ public Date getDate() {
+ return (Date) getValue();
+ }
+
+ public BigDecimal getDecimal() {
+ return (BigDecimal) getValue();
+ }
+
+ @Override
+ public boolean accept() {
+ if (value == null) {
+ return true;
+ }
+
+ switch (outputType) {
+ case BIGINT:
+ return value instanceof Long;
+ case DECIMAL:
+ return value instanceof BigDecimal;
+ case DOUBLE:
+ return value instanceof Double;
+ case FLOAT:
+ return value instanceof Float;
+ case INTEGER:
+ return value instanceof Integer;
+ case SMALLINT:
+ return value instanceof Short;
+ case TINYINT:
+ return value instanceof Byte;
+ case BOOLEAN:
+ return value instanceof Boolean;
+ case CHAR:
+ case VARCHAR:
+ return value instanceof String || value instanceof NlsString;
+ case TIME:
+ return value instanceof GregorianCalendar;
+ case TIMESTAMP:
+ case DATE:
+ return value instanceof Date;
+ case INTERVAL_HOUR:
+ return value instanceof BigDecimal;
+ case INTERVAL_MINUTE:
+ return value instanceof BigDecimal;
+ case SYMBOL:
+ // for SYMBOL, it supports anything...
+ return true;
+ default:
+ throw new UnsupportedOperationException(outputType.name());
+ }
+ }
+
+ @Override
+ public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..7b4894a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+ public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ return getOperands().size() == 1
+ && outputType == SqlTypeName.BIGINT
+ && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (opType(0) == SqlTypeName.TIME) {
+ GregorianCalendar date = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+ } else {
+ Date date = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(outputType, date.getTime());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..42e511d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * invoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+ //as Method is not Serializable, need to keep class/method information, and rebuild it.
+ private transient Method method;
+ private String className;
+ private String methodName;
+ private List<String> paraClassName = new ArrayList<>();
+
+ public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+ SqlTypeName sqlTypeName) {
+ super(subExps, sqlTypeName);
+ this.method = method;
+
+ this.className = method.getDeclaringClass().getName();
+ this.methodName = method.getName();
+ for (Class<?> c : method.getParameterTypes()) {
+ paraClassName.add(c.getName());
+ }
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (method == null) {
+ reConstructMethod();
+ }
+ try {
+ List<Object> paras = new ArrayList<>();
+ for (BeamSqlExpression e : getOperands()) {
+ paras.add(e.evaluate(inputRow).getValue());
+ }
+
+ return BeamSqlPrimitive.of(getOutputType(),
+ method.invoke(null, paras.toArray(new Object[]{})));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * re-construct method from class/method.
+ */
+ private void reConstructMethod() {
+ try {
+ List<Class<?>> paraClass = new ArrayList<>();
+ for (String pc : paraClassName) {
+ paraClass.add(Class.forName(pc));
+ }
+ method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..76f602c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRow.getWindowEnd().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..21ec6dc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+ public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override
+ public boolean accept() {
+ return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ (Date) operands.get(0).evaluate(inputRow).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..a38fd12
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRow.getWindowStart().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..67a35fc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+ private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+ static {
+ ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+ super(operands, deduceOutputType(operands.get(0).getOutputType(),
+ operands.get(1).getOutputType()));
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ BigDecimal left = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+ BigDecimal right = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+
+ BigDecimal result = calc(left, right);
+ return getCorrectlyTypedResult(result);
+ }
+
+ protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+ protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+ int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+ int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+ if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+ && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+ return SqlTypeName.DOUBLE;
+ }
+
+ if (leftIndex < rightIndex) {
+ return right;
+ } else if (leftIndex > rightIndex) {
+ return left;
+ } else {
+ return left;
+ }
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 2) {
+ return false;
+ }
+
+ for (BeamSqlExpression operand : operands) {
+ if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+ Number actualValue;
+ switch (outputType) {
+ case TINYINT:
+ actualValue = rawResult.byteValue();
+ break;
+ case SMALLINT:
+ actualValue = rawResult.shortValue();
+ break;
+ case INTEGER:
+ actualValue = rawResult.intValue();
+ break;
+ case BIGINT:
+ actualValue = rawResult.longValue();
+ break;
+ case FLOAT:
+ actualValue = rawResult.floatValue();
+ break;
+ case DOUBLE:
+ actualValue = rawResult.doubleValue();
+ break;
+ case DECIMAL:
+ default:
+ actualValue = rawResult;
+ }
+ return BeamSqlPrimitive.of(outputType, actualValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..fbe3fc4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.divide(right, 10, RoundingMode.HALF_EVEN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
new file mode 100644
index 0000000..0241574
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '-' operator.
+ */
+public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlMinusExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.subtract(right);
+ }
+}