You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:34 UTC
[11/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
deleted file mode 100644
index 5d5d4fc..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.fun.SqlAvgAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.type.BasicSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link BeamAggregationTransforms}.
- *
- */
-public class BeamAggregationTransformTest extends BeamTransformBaseTest{
-
- @Rule
- public TestPipeline p = TestPipeline.create();
-
- private List<AggregateCall> aggCalls;
-
- private BeamSqlRowType keyType;
- private BeamSqlRowType aggPartType;
- private BeamSqlRowType outputType;
-
- private BeamSqlRowCoder inRecordCoder;
- private BeamSqlRowCoder keyCoder;
- private BeamSqlRowCoder aggCoder;
- private BeamSqlRowCoder outRecordCoder;
-
- /**
- * This step equals to below query.
- * <pre>
- * SELECT `f_int`
- * , COUNT(*) AS `size`
- * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
- * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
- * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
- * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
- * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
- * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
- * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
- * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
- * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
- * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
- * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
- * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
- * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
- * FROM TABLE_NAME
- * GROUP BY `f_int`
- * </pre>
- * @throws ParseException
- */
- @Test
- public void testCountPerElementBasic() throws ParseException {
- setupEnvironment();
-
- PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows));
-
- //1. extract fields in group-by key part
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy",
- WithKeys
- .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
- .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder));
-
- //2. apply a GroupByKey.
- PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
- .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
- .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
- IterableCoder.<BeamSqlRow>of(inRecordCoder)));
-
- //3. run aggregation functions
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
- Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
- new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
- .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
-
- //4. flat KV to a single record
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
- ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
- mergedStream.setCoder(outRecordCoder);
-
- //assert function BeamAggregationTransform.AggregationGroupByKeyFn
- PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
-
- //assert BeamAggregationTransform.AggregationCombineFn
- PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
-
- //assert BeamAggregationTransform.MergeAggregationRecord
- PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
-
- p.run();
-}
-
- private void setupEnvironment() {
- prepareAggregationCalls();
- prepareTypeAndCoder();
- }
-
- /**
- * create list of all {@link AggregateCall}.
- */
- @SuppressWarnings("deprecation")
- private void prepareAggregationCalls() {
- //aggregations for all data type
- aggCalls = new ArrayList<>();
- aggCalls.add(
- new AggregateCall(new SqlCountAggFunction(), false,
- Arrays.<Integer>asList(),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
- "count")
- );
- aggCalls.add(
- new AggregateCall(new SqlSumAggFunction(
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
- Arrays.<Integer>asList(1),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
- "sum1")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(1),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
- "avg1")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(1),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
- "max1")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(1),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
- "min1")
- );
-
- aggCalls.add(
- new AggregateCall(new SqlSumAggFunction(
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
- Arrays.<Integer>asList(2),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
- "sum2")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(2),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
- "avg2")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(2),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
- "max2")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(2),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
- "min2")
- );
-
- aggCalls.add(
- new AggregateCall(
- new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
- false,
- Arrays.<Integer>asList(3),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
- "sum3")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(3),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
- "avg3")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(3),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
- "max3")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(3),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
- "min3")
- );
-
- aggCalls.add(
- new AggregateCall(
- new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
- false,
- Arrays.<Integer>asList(4),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
- "sum4")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(4),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
- "avg4")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(4),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
- "max4")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(4),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
- "min4")
- );
-
- aggCalls.add(
- new AggregateCall(
- new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
- false,
- Arrays.<Integer>asList(5),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
- "sum5")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(5),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
- "avg5")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(5),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
- "max5")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(5),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
- "min5")
- );
-
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(7),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
- "max7")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(7),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
- "min7")
- );
-
- aggCalls.add(
- new AggregateCall(
- new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
- false,
- Arrays.<Integer>asList(8),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
- "sum8")
- );
- aggCalls.add(
- new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
- Arrays.<Integer>asList(8),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
- "avg8")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
- Arrays.<Integer>asList(8),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
- "max8")
- );
- aggCalls.add(
- new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
- Arrays.<Integer>asList(8),
- new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
- "min8")
- );
- }
-
- /**
- * Coders used in aggregation steps.
- */
- private void prepareTypeAndCoder() {
- inRecordCoder = new BeamSqlRowCoder(inputRowType);
-
- keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
- keyCoder = new BeamSqlRowCoder(keyType);
-
- aggPartType = initTypeOfSqlRow(
- Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
-
- KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
- KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
- KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
- KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
- KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
- KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
- KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
- KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
- KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
- KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
- KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
- KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
- KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
- ));
- aggCoder = new BeamSqlRowCoder(aggPartType);
-
- outputType = prepareFinalRowType();
- outRecordCoder = new BeamSqlRowCoder(outputType);
- }
-
- /**
- * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
- */
- private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() {
- return Arrays.asList(
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
- inputRows.get(0)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
- inputRows.get(1)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
- inputRows.get(2)),
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
- inputRows.get(3)));
- }
-
- /**
- * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
- */
- private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn()
- throws ParseException {
- return Arrays.asList(
- KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
- new BeamSqlRow(aggPartType, Arrays.<Object>asList(
- 4L,
- 10000L, 2500L, 4000L, 1000L,
- (short) 10, (short) 2, (short) 4, (short) 1,
- (byte) 10, (byte) 2, (byte) 4, (byte) 1,
- 10.0F, 2.5F, 4.0F, 1.0F,
- 10.0, 2.5, 4.0, 1.0,
- format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
- 10, 2, 4, 1
- )))
- );
- }
-
- /**
- * Row type of final output row.
- */
- private BeamSqlRowType prepareFinalRowType() {
- FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
- List<KV<String, SqlTypeName>> columnMetadata =
- Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
-
- KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
- KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
- KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
- KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
- KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
- KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
- KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
- KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
- KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
- KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
- KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
- KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
- KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
- );
- for (KV<String, SqlTypeName> cm : columnMetadata) {
- builder.add(cm.getKey(), cm.getValue());
- }
- return CalciteUtils.toBeamRowType(builder.build());
- }
-
- /**
- * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
- */
- private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException {
- return new BeamSqlRow(outputType, Arrays.<Object>asList(
- 1, 4L,
- 10000L, 2500L, 4000L, 1000L,
- (short) 10, (short) 2, (short) 4, (short) 1,
- (byte) 10, (byte) 2, (byte) 4, (byte) 1,
- 10.0F, 2.5F, 4.0F, 1.0F,
- 10.0, 2.5, 4.0, 1.0,
- format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
- 10, 2, 4, 1
- ));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
deleted file mode 100644
index 4045bc8..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
-
-/**
- * shared methods to test PTransforms which execute Beam SQL steps.
- *
- */
-public class BeamTransformBaseTest {
- public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- public static BeamSqlRowType inputRowType;
- public static List<BeamSqlRow> inputRows;
-
- @BeforeClass
- public static void prepareInput() throws NumberFormatException, ParseException{
- List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
- KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
- KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
- KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
- KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
- KV.of("f_int2", SqlTypeName.INTEGER)
- );
- inputRowType = initTypeOfSqlRow(columnMetadata);
- inputRows = Arrays.asList(
- initBeamSqlRow(columnMetadata,
- Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0,
- "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
- initBeamSqlRow(columnMetadata,
- Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0,
- "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
- initBeamSqlRow(columnMetadata,
- Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0,
- "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
- initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
- Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4)));
- }
-
- /**
- * create a {@code BeamSqlRowType} for given column metadata.
- */
- public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
- FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
- for (KV<String, SqlTypeName> cm : columnMetadata) {
- builder.add(cm.getKey(), cm.getValue());
- }
- return CalciteUtils.toBeamRowType(builder.build());
- }
-
- /**
- * Create an empty row with given column metadata.
- */
- public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
- return initBeamSqlRow(columnMetadata, Arrays.asList());
- }
-
- /**
- * Create a row with given column metadata, and values for each column.
- *
- */
- public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
- List<Object> rowValues){
- BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
-
- return new BeamSqlRow(rowType, rowValues);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a86a680..803d30c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,7 +180,6 @@
<module>sdks/java/build-tools</module>
<module>sdks</module>
<module>runners</module>
- <module>dsls</module>
<module>examples</module>
<!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
<module>sdks/java/javadoc</module>
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 1222476..5465cf0 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -37,6 +37,7 @@
<module>join-library</module>
<module>protobuf</module>
<module>sorter</module>
+ <module>sql</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
new file mode 100644
index 0000000..b4aa223
--- /dev/null
+++ b/sdks/java/extensions/sql/pom.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-parent</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-extensions-sql</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Extensions :: SQL</name>
+ <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <timestamp>${maven.build.timestamp}</timestamp>
+ <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
+ <calcite.version>1.13.0</calcite.version>
+ <avatica.version>1.10.0</avatica.version>
+ </properties>
+
+ <profiles>
+ <!--
+ The direct runner is available by default.
+ You can also include it on the classpath explicitly with -P direct-runner
+ -->
+ <profile>
+ <id>direct-runner</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <!-- Set testSourceDirectory in order to exclude generated-test-sources -->
+ <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>bundle-and-repackage</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>true</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-core</artifactId>
+ <version>${avatica.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-lite</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <!-- this is a hack to make it available at compile time but not bundled.-->
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-kafka</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- for tests -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
new file mode 100644
index 0000000..d902f42
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+ .apply(...);
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+ .apply(...);
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(
+ BeamSql.simpleQuery(sql1)
+ .withUdf("MY_FUNC", MY_FUNC.class, "FUNC"));
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+ new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+ .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+ .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+ * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
+ * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+ * {@code PCollection<BeamSqlRow>}, each representing an input table.
+ *
+ * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+ * referenced within the query.
+ */
+ public static QueryTransform query(String sqlQuery) {
+ return QueryTransform.builder()
+ .setSqlEnv(new BeamSqlEnv())
+ .setSqlQuery(sqlQuery)
+ .build();
+ }
+
+ /**
+ * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+ *
+ * <p>This is a simplified form of {@link #query(String)} where the query must reference
+ * a single input table.
+ *
+ * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
+ */
+ public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception {
+ return SimpleQueryTransform.builder()
+ .setSqlEnv(new BeamSqlEnv())
+ .setSqlQuery(sqlQuery)
+ .build();
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query.
+ */
+ @AutoValue
+ public abstract static class QueryTransform extends
+ PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+ abstract BeamSqlEnv getSqlEnv();
+ abstract String getSqlQuery();
+
+ static Builder builder() {
+ return new AutoValue_BeamSql_QueryTransform.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSqlQuery(String sqlQuery);
+ abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+ abstract QueryTransform build();
+ }
+
+ /**
+ * register a UDF function used in this query.
+ */
+ public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
+ return this;
+ }
+
+ /**
+ * register a UDAF function used in this query.
+ */
+ public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+ getSqlEnv().registerUdaf(functionName, clazz);
+ return this;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+ registerTables(input);
+
+ BeamRelNode beamRelNode = null;
+ try {
+ beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+ } catch (ValidationException | RelConversionException | SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ try {
+ return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ //register tables, related with input PCollections.
+ private void registerTables(PCollectionTuple input){
+ for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+ PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+ BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+ getSqlEnv().registerTable(sourceTag.getId(),
+ new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+ }
+ }
+ }
+
+ /**
+ * A {@link PTransform} representing an execution plan for a SQL query referencing
+ * a single table.
+ */
+ @AutoValue
+ public abstract static class SimpleQueryTransform
+ extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+ private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
+ abstract BeamSqlEnv getSqlEnv();
+ abstract String getSqlQuery();
+
+ static Builder builder() {
+ return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSqlQuery(String sqlQuery);
+ abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+ abstract SimpleQueryTransform build();
+ }
+
+ /**
+ * register a UDF function used in this query.
+ */
+ public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+ getSqlEnv().registerUdf(functionName, clazz);
+ return this;
+ }
+
+ /**
+ * register a UDAF function used in this query.
+ */
+ public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+ getSqlEnv().registerUdaf(functionName, clazz);
+ return this;
+ }
+
+ private void validateQuery() {
+ SqlNode sqlNode;
+ try {
+ sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
+ getSqlEnv().planner.getPlanner().close();
+ } catch (SqlParseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ if (sqlNode instanceof SqlSelect) {
+ SqlSelect select = (SqlSelect) sqlNode;
+ String tableName = select.getFrom().toString();
+ if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
+ throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Sql operation: " + sqlNode.toString() + " is not supported!");
+ }
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+ validateQuery();
+ return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+ .apply(QueryTransform.builder()
+ .setSqlEnv(getSqlEnv())
+ .setSqlQuery(getSqlQuery())
+ .build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
new file mode 100644
index 0000000..50da244
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+ /**
+ * Returns a human readable representation of the query execution plan.
+ */
+ public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+ BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
+ String beamPlan = RelOptUtil.toString(exeTree);
+ return beamPlan;
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+ throws Exception{
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+ .as(PipelineOptions.class); // FlinkPipelineOptions.class
+ options.setJobName("BeamPlanCreator");
+ Pipeline pipeline = Pipeline.create(options);
+
+ return compilePipeline(sqlStatement, pipeline, sqlEnv);
+ }
+
+ /**
+ * compile SQL, and return a {@link Pipeline}.
+ */
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+ , BeamSqlEnv sqlEnv) throws Exception{
+ PCollection<BeamSqlRow> resultStream =
+ sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+ return resultStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..0e1ac98
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+ transient SchemaPlus schema;
+ transient BeamQueryPlanner planner;
+
+ public BeamSqlEnv() {
+ schema = Frameworks.createRootSchema(true);
+ planner = new BeamQueryPlanner(schema);
+ }
+
+ /**
+ * Register a UDF function which can be used in SQL expression.
+ */
+ public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+ }
+
+ /**
+ * Register a UDAF function which can be used in GROUP-BY expression.
+ * See {@link BeamSqlUdaf} on how to implement a UDAF.
+ */
+ public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) {
+ schema.add(functionName, AggregateFunctionImpl.create(clazz));
+ }
+
+ /**
+ * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+ *
+ */
+ public void registerTable(String tableName, BaseBeamTable table) {
+ schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+ planner.getSourceTables().put(tableName, table);
+ }
+
+ /**
+ * Find {@link BaseBeamTable} by table name.
+ */
+ public BaseBeamTable findTable(String tableName){
+ return planner.getSourceTables().get(tableName);
+ }
+
+ private static class BeamCalciteTable implements ScannableTable, Serializable {
+ private BeamSqlRowType beamSqlRowType;
+ public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+ .apply(BeamQueryPlanner.TYPE_FACTORY);
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ // not used as Beam SQL uses its own execution engine
+ return null;
+ }
+
+ /**
+ * Not used {@link Statistic} to optimize the plan.
+ */
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ /**
+ * all sources are treated as TABLE in Beam SQL.
+ */
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
new file mode 100644
index 0000000..4e364e1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.example;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
+ *
+ * <p>Run the example with
+ * <pre>
+ * mvn -pl dsls/sql compile exec:java \
+ * -Dexec.mainClass=org.apache.beam.dsls.sql.example.BeamSqlExample \
+ * -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * </pre>
+ *
+ */
+class BeamSqlExample {
+ public static void main(String[] args) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+ Pipeline p = Pipeline.create(options);
+
+ //define the input row format
+ List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+ List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+ BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
+ BeamSqlRow row = new BeamSqlRow(type);
+ row.addField(0, 1);
+ row.addField(1, "row");
+ row.addField(2, 1.0);
+
+ //create a source PCollection with Create.of();
+ PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
+ .withCoder(new BeamSqlRowCoder(type)));
+
+ //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+ PCollection<BeamSqlRow> outputStream = inputTable.apply(
+ BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+
+ //print the output record of case 1;
+ outputStream.apply("log_result",
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ public Void apply(BeamSqlRow input) {
+ System.out.println("PCOLLECTION: " + input);
+ return null;
+ }
+ }));
+
+ //Case 2. run the query with BeamSql.query over result PCollection of case 1.
+ PCollection<BeamSqlRow> outputStream2 =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+ .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+
+ //print the output record of case 2;
+ outputStream2.apply("log_result",
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+ @Override
+ public Void apply(BeamSqlRow input) {
+ System.out.println("TABLE_B: " + input);
+ return null;
+ }
+ }));
+
+ p.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
new file mode 100644
index 0000000..52a9fce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * examples on how to use BeamSQL.
+ *
+ */
+package org.apache.beam.dsls.sql.example;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..3732933
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+ /**
+ * invoked before data processing.
+ */
+ void prepare();
+
+ /**
+ * apply transformation to input record {@link BeamSqlRow}.
+ *
+ */
+ List<Object> execute(BeamSqlRow inputRow);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..aee0e4a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+ protected List<BeamSqlExpression> exps;
+
+ public BeamSqlFnExecutor(BeamRelNode relNode) {
+ this.exps = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ BeamFilterRel filterNode = (BeamFilterRel) relNode;
+ RexNode condition = filterNode.getCondition();
+ exps.add(buildExpression(condition));
+ } else if (relNode instanceof BeamProjectRel) {
+ BeamProjectRel projectNode = (BeamProjectRel) relNode;
+ List<RexNode> projects = projectNode.getProjects();
+ for (RexNode rexNode : projects) {
+ exps.add(buildExpression(rexNode));
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", relNode.getClass().toString()));
+ }
+ }
+
+ /**
+ * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+ * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+ */
+ static BeamSqlExpression buildExpression(RexNode rexNode) {
+ BeamSqlExpression ret = null;
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral node = (RexLiteral) rexNode;
+ SqlTypeName type = node.getTypeName();
+ Object value = node.getValue();
+
+ if (SqlTypeName.CHAR_TYPES.contains(type)
+ && node.getValue() instanceof NlsString) {
+ // NlsString is not serializable, we need to convert
+ // it to string explicitly.
+ return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+ } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+ // does this actually make sense?
+ // Calcite actually treat Calendar as the java type of Date Literal
+ return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+ } else {
+ // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+ // e.g. sql: "select 1"
+ // here the literal 1 will be parsed as a RexLiteral where:
+ // node.getType().getSqlTypeName() = INTEGER (the display type)
+ // node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+ // So we need to do a convert here.
+ // check RexBuilder#makeLiteral for more information.
+ SqlTypeName realType = node.getType().getSqlTypeName();
+ Object realValue = value;
+ if (type == SqlTypeName.DECIMAL) {
+ BigDecimal rawValue = (BigDecimal) value;
+ switch (realType) {
+ case TINYINT:
+ realValue = (byte) rawValue.intValue();
+ break;
+ case SMALLINT:
+ realValue = (short) rawValue.intValue();
+ break;
+ case INTEGER:
+ realValue = rawValue.intValue();
+ break;
+ case BIGINT:
+ realValue = rawValue.longValue();
+ break;
+ case DECIMAL:
+ realValue = rawValue;
+ break;
+ default:
+ throw new IllegalStateException("type/realType mismatch: "
+ + type + " VS " + realType);
+ }
+ } else if (type == SqlTypeName.DOUBLE) {
+ Double rawValue = (Double) value;
+ if (realType == SqlTypeName.FLOAT) {
+ realValue = rawValue.floatValue();
+ }
+ }
+ return BeamSqlPrimitive.of(realType, realValue);
+ }
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef node = (RexInputRef) rexNode;
+ ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+ } else if (rexNode instanceof RexCall) {
+ RexCall node = (RexCall) rexNode;
+ String opName = node.op.getName();
+ List<BeamSqlExpression> subExps = new ArrayList<>();
+ for (RexNode subNode : node.getOperands()) {
+ subExps.add(buildExpression(subNode));
+ }
+ switch (opName) {
+ // logical operators
+ case "AND":
+ ret = new BeamSqlAndExpression(subExps);
+ break;
+ case "OR":
+ ret = new BeamSqlOrExpression(subExps);
+ break;
+ case "NOT":
+ ret = new BeamSqlNotExpression(subExps);
+ break;
+ case "=":
+ ret = new BeamSqlEqualsExpression(subExps);
+ break;
+ case "<>":
+ ret = new BeamSqlNotEqualsExpression(subExps);
+ break;
+ case ">":
+ ret = new BeamSqlGreaterThanExpression(subExps);
+ break;
+ case ">=":
+ ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+ break;
+ case "<":
+ ret = new BeamSqlLessThanExpression(subExps);
+ break;
+ case "<=":
+ ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+ break;
+
+ // arithmetic operators
+ case "+":
+ ret = new BeamSqlPlusExpression(subExps);
+ break;
+ case "-":
+ ret = new BeamSqlMinusExpression(subExps);
+ break;
+ case "*":
+ ret = new BeamSqlMultiplyExpression(subExps);
+ break;
+ case "/":
+ case "/INT":
+ ret = new BeamSqlDivideExpression(subExps);
+ break;
+ case "MOD":
+ ret = new BeamSqlModExpression(subExps);
+ break;
+
+ case "ABS":
+ ret = new BeamSqlAbsExpression(subExps);
+ break;
+ case "ROUND":
+ ret = new BeamSqlRoundExpression(subExps);
+ break;
+ case "LN":
+ ret = new BeamSqlLnExpression(subExps);
+ break;
+ case "LOG10":
+ ret = new BeamSqlLogExpression(subExps);
+ break;
+ case "EXP":
+ ret = new BeamSqlExpExpression(subExps);
+ break;
+ case "ACOS":
+ ret = new BeamSqlAcosExpression(subExps);
+ break;
+ case "ASIN":
+ ret = new BeamSqlAsinExpression(subExps);
+ break;
+ case "ATAN":
+ ret = new BeamSqlAtanExpression(subExps);
+ break;
+ case "COT":
+ ret = new BeamSqlCotExpression(subExps);
+ break;
+ case "DEGREES":
+ ret = new BeamSqlDegreesExpression(subExps);
+ break;
+ case "RADIANS":
+ ret = new BeamSqlRadiansExpression(subExps);
+ break;
+ case "COS":
+ ret = new BeamSqlCosExpression(subExps);
+ break;
+ case "SIN":
+ ret = new BeamSqlSinExpression(subExps);
+ break;
+ case "TAN":
+ ret = new BeamSqlTanExpression(subExps);
+ break;
+ case "SIGN":
+ ret = new BeamSqlSignExpression(subExps);
+ break;
+ case "POWER":
+ ret = new BeamSqlPowerExpression(subExps);
+ break;
+ case "PI":
+ ret = new BeamSqlPiExpression();
+ break;
+ case "ATAN2":
+ ret = new BeamSqlAtan2Expression(subExps);
+ break;
+ case "TRUNCATE":
+ ret = new BeamSqlTruncateExpression(subExps);
+ break;
+ case "RAND":
+ ret = new BeamSqlRandExpression(subExps);
+ break;
+ case "RAND_INTEGER":
+ ret = new BeamSqlRandIntegerExpression(subExps);
+ break;
+
+ // string operators
+ case "||":
+ ret = new BeamSqlConcatExpression(subExps);
+ break;
+ case "POSITION":
+ ret = new BeamSqlPositionExpression(subExps);
+ break;
+ case "CHAR_LENGTH":
+ case "CHARACTER_LENGTH":
+ ret = new BeamSqlCharLengthExpression(subExps);
+ break;
+ case "UPPER":
+ ret = new BeamSqlUpperExpression(subExps);
+ break;
+ case "LOWER":
+ ret = new BeamSqlLowerExpression(subExps);
+ break;
+ case "TRIM":
+ ret = new BeamSqlTrimExpression(subExps);
+ break;
+ case "SUBSTRING":
+ ret = new BeamSqlSubstringExpression(subExps);
+ break;
+ case "OVERLAY":
+ ret = new BeamSqlOverlayExpression(subExps);
+ break;
+ case "INITCAP":
+ ret = new BeamSqlInitCapExpression(subExps);
+ break;
+
+ // date functions
+ case "Reinterpret":
+ return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+ case "CEIL":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlCeilExpression(subExps);
+ } else {
+ return new BeamSqlDateCeilExpression(subExps);
+ }
+ case "FLOOR":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlFloorExpression(subExps);
+ } else {
+ return new BeamSqlDateFloorExpression(subExps);
+ }
+ case "EXTRACT_DATE":
+ case "EXTRACT":
+ return new BeamSqlExtractExpression(subExps);
+
+ case "LOCALTIME":
+ case "CURRENT_TIME":
+ return new BeamSqlCurrentTimeExpression(subExps);
+
+ case "CURRENT_TIMESTAMP":
+ case "LOCALTIMESTAMP":
+ return new BeamSqlCurrentTimestampExpression(subExps);
+
+ case "CURRENT_DATE":
+ return new BeamSqlCurrentDateExpression();
+
+
+ case "CASE":
+ ret = new BeamSqlCaseExpression(subExps);
+ break;
+ case "CAST":
+ ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+ break;
+
+ case "IS NULL":
+ ret = new BeamSqlIsNullExpression(subExps.get(0));
+ break;
+ case "IS NOT NULL":
+ ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+ break;
+
+ case "HOP":
+ case "TUMBLE":
+ case "SESSION":
+ ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+ break;
+ case "HOP_START":
+ case "TUMBLE_START":
+ case "SESSION_START":
+ ret = new BeamSqlWindowStartExpression();
+ break;
+ case "HOP_END":
+ case "TUMBLE_END":
+ case "SESSION_END":
+ ret = new BeamSqlWindowEndExpression();
+ break;
+ default:
+ //handle UDF
+ if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+ SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+ ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+ ret = new BeamSqlUdfExpression(fn.method, subExps,
+ ((RexCall) rexNode).type.getSqlTypeName());
+ } else {
+ throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", rexNode.getClass().toString()));
+ }
+
+ if (ret != null && !ret.accept()) {
+ throw new IllegalStateException(ret.getClass().getSimpleName()
+ + " does not accept the operands.(" + rexNode + ")");
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List<Object> execute(BeamSqlRow inputRow) {
+ List<Object> results = new ArrayList<>();
+ for (BeamSqlExpression exp : exps) {
+ results.add(exp.evaluate(inputRow).getValue());
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..a30916b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+ public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+ // the return type of CASE is the type of the `else` condition
+ super(operands, operands.get(operands.size() - 1).getOutputType());
+ }
+
+ @Override public boolean accept() {
+ // `when`-`then` pair + `else`
+ if (operands.size() % 2 != 1) {
+ return false;
+ }
+
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opType(i) != SqlTypeName.BOOLEAN) {
+ return false;
+ } else if (opType(i + 1) != outputType) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opValueEvaluated(i, inputRow)) {
+ return BeamSqlPrimitive.of(
+ outputType,
+ opValueEvaluated(i + 1, inputRow)
+ );
+ }
+ }
+ return BeamSqlPrimitive.of(outputType,
+ opValueEvaluated(operands.size() - 1, inputRow));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..524d1df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+ private static final int index = 0;
+ private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+ private static final String outputDateFormat = "yyyy-MM-dd";
+ /**
+ * Date and Timestamp formats used to parse
+ * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+ */
+ private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+ .append(null/*printer*/, new DateTimeParser[] {
+ // date formats
+ DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+ DateTimeFormat.forPattern("yyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+ // datetime formats
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+ .withPivotYear(2020);
+
+ public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+ super(operands, castType);
+ }
+
+ @Override
+ public boolean accept() {
+ return numberOfOperands() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ SqlTypeName castOutputType = getOutputType();
+ switch (castOutputType) {
+ case INTEGER:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+ case DOUBLE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+ case SMALLINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+ case TINYINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+ case BIGINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+ case FLOAT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+ case CHAR:
+ case VARCHAR:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+ case DATE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+ case TIMESTAMP:
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+ }
+ throw new UnsupportedOperationException(
+ String.format("Cast to type %s not supported", castOutputType));
+ }
+
+ private Date toDate(Object inputDate, String outputFormat) {
+ try {
+ return Date
+ .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+ }
+ }
+
+ private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+ try {
+ return Timestamp.valueOf(
+ dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+ .roundCeilingCopy().toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+ }
+ }
+}