You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:34 UTC

[11/59] beam git commit: move dsls/sql to sdks/java/extensions/sql

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
deleted file mode 100644
index 5d5d4fc..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.fun.SqlAvgAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.type.BasicSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link BeamAggregationTransforms}.
- *
- */
-public class BeamAggregationTransformTest extends BeamTransformBaseTest{
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  private List<AggregateCall> aggCalls;
-
-  private BeamSqlRowType keyType;
-  private BeamSqlRowType aggPartType;
-  private BeamSqlRowType outputType;
-
-  private BeamSqlRowCoder inRecordCoder;
-  private BeamSqlRowCoder keyCoder;
-  private BeamSqlRowCoder aggCoder;
-  private BeamSqlRowCoder outRecordCoder;
-
-  /**
-   * This step equals to below query.
-   * <pre>
-   * SELECT `f_int`
-   * , COUNT(*) AS `size`
-   * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
-   * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
-   * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
-   * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
-   * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
-   * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
-   * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
-   * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
-   * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
-   * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
-   * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
-   * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
-   * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
-   * FROM TABLE_NAME
-   * GROUP BY `f_int`
-   * </pre>
-   * @throws ParseException
-   */
-  @Test
-  public void testCountPerElementBasic() throws ParseException {
-    setupEnvironment();
-
-    PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows));
-
-    //1. extract fields in group-by key part
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy",
-        WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder));
-
-    //2. apply a GroupByKey.
-    PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
-        .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
-            IterableCoder.<BeamSqlRow>of(inRecordCoder)));
-
-    //3. run aggregation functions
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
-            new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
-
-    //4. flat KV to a single record
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
-    mergedStream.setCoder(outRecordCoder);
-
-    //assert function BeamAggregationTransform.AggregationGroupByKeyFn
-    PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
-
-    //assert BeamAggregationTransform.AggregationCombineFn
-    PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
-
-  //assert BeamAggregationTransform.MergeAggregationRecord
-    PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
-
-    p.run();
-}
-
-  private void setupEnvironment() {
-    prepareAggregationCalls();
-    prepareTypeAndCoder();
-  }
-
-  /**
-   * create list of all {@link AggregateCall}.
-   */
-  @SuppressWarnings("deprecation")
-  private void prepareAggregationCalls() {
-    //aggregations for all data type
-    aggCalls = new ArrayList<>();
-    aggCalls.add(
-        new AggregateCall(new SqlCountAggFunction(), false,
-            Arrays.<Integer>asList(),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "count")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "sum1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "avg1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "max1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "min1")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "sum2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "avg2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "max2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "min2")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
-            false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "sum3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "avg3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "max3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "min3")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
-            false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "sum4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "avg4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "max4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "min4")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
-            false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "sum5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "avg5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "max5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "min5")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "max7")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "min7")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
-            false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "sum8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "avg8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "max8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "min8")
-        );
-  }
-
-  /**
-   * Coders used in aggregation steps.
-   */
-  private void prepareTypeAndCoder() {
-    inRecordCoder = new BeamSqlRowCoder(inputRowType);
-
-    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
-    keyCoder = new BeamSqlRowCoder(keyType);
-
-    aggPartType = initTypeOfSqlRow(
-        Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
-
-            KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-            KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-            KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-            KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-            KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-            KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-            KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-            KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-            KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-            KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-            KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-            KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-            KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-            ));
-    aggCoder = new BeamSqlRowCoder(aggPartType);
-
-    outputType = prepareFinalRowType();
-    outRecordCoder = new BeamSqlRowCoder(outputType);
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
-   */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() {
-    return Arrays.asList(
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-            inputRows.get(0)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
-            inputRows.get(1)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
-            inputRows.get(2)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
-            inputRows.get(3)));
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
-   */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn()
-      throws ParseException {
-    return Arrays.asList(
-            KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-                new BeamSqlRow(aggPartType, Arrays.<Object>asList(
-                    4L,
-                    10000L, 2500L, 4000L, 1000L,
-                    (short) 10, (short) 2, (short) 4, (short) 1,
-                    (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-                    10.0F, 2.5F, 4.0F, 1.0F,
-                    10.0, 2.5, 4.0, 1.0,
-                    format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-                    10, 2, 4, 1
-                    )))
-            );
-  }
-
-  /**
-   * Row type of final output row.
-   */
-  private BeamSqlRowType prepareFinalRowType() {
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    List<KV<String, SqlTypeName>> columnMetadata =
-        Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
-
-        KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-        KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-        KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-        KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-        KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-        KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-        KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-        KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-        KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-        KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-        KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-        KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-        KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-        );
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
-   */
-  private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException {
-    return new BeamSqlRow(outputType, Arrays.<Object>asList(
-        1, 4L,
-        10000L, 2500L, 4000L, 1000L,
-        (short) 10, (short) 2, (short) 4, (short) 1,
-        (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-        10.0F, 2.5F, 4.0F, 1.0F,
-        10.0, 2.5, 4.0, 1.0,
-        format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-        10, 2, 4, 1
-        ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
deleted file mode 100644
index 4045bc8..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
-
-/**
- * shared methods to test PTransforms which execute Beam SQL steps.
- *
- */
-public class BeamTransformBaseTest {
-  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  public static BeamSqlRowType inputRowType;
-  public static List<BeamSqlRow> inputRows;
-
-  @BeforeClass
-  public static void prepareInput() throws NumberFormatException, ParseException{
-    List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
-        KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
-        KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
-        KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
-        KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
-        KV.of("f_int2", SqlTypeName.INTEGER)
-        );
-    inputRowType = initTypeOfSqlRow(columnMetadata);
-    inputRows = Arrays.asList(
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0,
-                "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0,
-                "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0,
-                "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
-        initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
-            Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4)));
-  }
-
-  /**
-   * create a {@code BeamSqlRowType} for given column metadata.
-   */
-  public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * Create an empty row with given column metadata.
-   */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
-    return initBeamSqlRow(columnMetadata, Arrays.asList());
-  }
-
-  /**
-   * Create a row with given column metadata, and values for each column.
-   *
-   */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
-      List<Object> rowValues){
-    BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
-
-    return new BeamSqlRow(rowType, rowValues);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a86a680..803d30c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,7 +180,6 @@
     <module>sdks/java/build-tools</module>
     <module>sdks</module>
     <module>runners</module>
-    <module>dsls</module>
     <module>examples</module>
     <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
     <module>sdks/java/javadoc</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 1222476..5465cf0 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -37,6 +37,7 @@
     <module>join-library</module>
     <module>protobuf</module>
     <module>sorter</module>
+    <module>sql</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
new file mode 100644
index 0000000..b4aa223
--- /dev/null
+++ b/sdks/java/extensions/sql/pom.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-sql</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: SQL</name>
+  <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <timestamp>${maven.build.timestamp}</timestamp>
+    <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
+    <calcite.version>1.13.0</calcite.version>
+    <avatica.version>1.10.0</avatica.version>
+  </properties>
+
+  <profiles>
+    <!--
+      The direct runner is available by default.
+      You can also include it on the classpath explicitly with -P direct-runner
+    -->
+    <profile>
+      <id>direct-runner</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-direct-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-checkstyle-plugin</artifactId>
+          <configuration>
+            <!-- Set testSourceDirectory in order to exclude generated-test-sources -->
+            <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+        <argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <version>${calcite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-linq4j</artifactId>
+      <version>${calcite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.calcite.avatica</groupId>
+      <artifactId>avatica-core</artifactId>
+      <version>${avatica.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-lite</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <!-- this is a hack to make it available at compile time but not bundled.-->
+      <scope>provided</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-kafka</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    
+    <!-- for tests  -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
new file mode 100644
index 0000000..d902f42
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+    .apply(...);
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+    .apply(...);
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(
+    BeamSql.simpleQuery(sql1)
+    .withUdf("MY_FUNC", MY_FUNC.class, "FUNC"));
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+    new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+                .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+    .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+   * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
+   * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+   * {@code PCollection<BeamSqlRow>}, each representing an input table.
+   *
+   * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+   * referenced within the query.
+   */
+  public static QueryTransform query(String sqlQuery) {
+    return QueryTransform.builder()
+        .setSqlEnv(new BeamSqlEnv())
+        .setSqlQuery(sqlQuery)
+        .build();
+  }
+
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>This is a simplified form of {@link #query(String)} where the query must reference
+   * a single input table.
+   *
+   * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
+   */
+  public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception {
+    return SimpleQueryTransform.builder()
+        .setSqlEnv(new BeamSqlEnv())
+        .setSqlQuery(sqlQuery)
+        .build();
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query.
+   */
+  @AutoValue
+  public abstract static class QueryTransform extends
+      PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+    abstract BeamSqlEnv getSqlEnv();
+    abstract String getSqlQuery();
+
+    static Builder builder() {
+      return new AutoValue_BeamSql_QueryTransform.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSqlQuery(String sqlQuery);
+      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+      abstract QueryTransform build();
+    }
+
+    /**
+     * register a UDF function used in this query.
+     */
+     public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
+       return this;
+     }
+
+     /**
+      * register a UDAF function used in this query.
+      */
+     public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+       getSqlEnv().registerUdaf(functionName, clazz);
+       return this;
+     }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+      registerTables(input);
+
+      BeamRelNode beamRelNode = null;
+      try {
+        beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+      } catch (ValidationException | RelConversionException | SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+
+      try {
+        return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    //register tables, related with input PCollections.
+    private void registerTables(PCollectionTuple input){
+      for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+        getSqlEnv().registerTable(sourceTag.getId(),
+            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query referencing
+   * a single table.
+   */
+  @AutoValue
+  public abstract static class SimpleQueryTransform
+      extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+    private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
+    abstract BeamSqlEnv getSqlEnv();
+    abstract String getSqlQuery();
+
+    static Builder builder() {
+      return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSqlQuery(String sqlQuery);
+      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+      abstract SimpleQueryTransform build();
+    }
+
+    /**
+     * register a UDF function used in this query.
+     */
+     public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
+       return this;
+     }
+
+     /**
+      * register a UDAF function used in this query.
+      */
+     public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+       getSqlEnv().registerUdaf(functionName, clazz);
+       return this;
+     }
+
+    private void validateQuery() {
+      SqlNode sqlNode;
+      try {
+        sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
+        getSqlEnv().planner.getPlanner().close();
+      } catch (SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+
+      if (sqlNode instanceof SqlSelect) {
+        SqlSelect select = (SqlSelect) sqlNode;
+        String tableName = select.getFrom().toString();
+        if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
+          throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
+        }
+      } else {
+        throw new UnsupportedOperationException(
+            "Sql operation: " + sqlNode.toString() + " is not supported!");
+      }
+    }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+      validateQuery();
+      return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+          .apply(QueryTransform.builder()
+              .setSqlEnv(getSqlEnv())
+              .setSqlQuery(getSqlQuery())
+              .build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
new file mode 100644
index 0000000..50da244
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+  /**
+   * Returns a human readable representation of the query execution plan.
+   */
+  public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+    BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
+    String beamPlan = RelOptUtil.toString(exeTree);
+    return beamPlan;
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+      throws Exception{
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+        .as(PipelineOptions.class); // FlinkPipelineOptions.class
+    options.setJobName("BeamPlanCreator");
+    Pipeline pipeline = Pipeline.create(options);
+
+    return compilePipeline(sqlStatement, pipeline, sqlEnv);
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+      , BeamSqlEnv sqlEnv) throws Exception{
+    PCollection<BeamSqlRow> resultStream =
+        sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+    return resultStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..0e1ac98
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+  transient SchemaPlus schema;
+  transient BeamQueryPlanner planner;
+
+  public BeamSqlEnv() {
+    schema = Frameworks.createRootSchema(true);
+    planner = new BeamQueryPlanner(schema);
+  }
+
+  /**
+   * Register a UDF function which can be used in SQL expression.
+   */
+  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+    schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+  }
+
+  /**
+   * Register a UDAF function which can be used in GROUP-BY expression.
+   * See {@link BeamSqlUdaf} on how to implement a UDAF.
+   */
+  public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) {
+    schema.add(functionName, AggregateFunctionImpl.create(clazz));
+  }
+
+  /**
+   * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+   *
+   */
+  public void registerTable(String tableName, BaseBeamTable table) {
+    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+    planner.getSourceTables().put(tableName, table);
+  }
+
+  /**
+   * Find {@link BaseBeamTable} by table name.
+   */
+  public BaseBeamTable findTable(String tableName){
+    return planner.getSourceTables().get(tableName);
+  }
+
+  private static class BeamCalciteTable implements ScannableTable, Serializable {
+    private BeamSqlRowType beamSqlRowType;
+    public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+      this.beamSqlRowType = beamSqlRowType;
+    }
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+          .apply(BeamQueryPlanner.TYPE_FACTORY);
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      // not used as Beam SQL uses its own execution engine
+      return null;
+    }
+
+    /**
+     * Not used {@link Statistic} to optimize the plan.
+     */
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.UNKNOWN;
+    }
+
+    /**
+     * all sources are treated as TABLE in Beam SQL.
+     */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
new file mode 100644
index 0000000..4e364e1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.example;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
+ *
+ * <p>Run the example with
+ * <pre>
+ * mvn -pl dsls/sql compile exec:java \
+ *  -Dexec.mainClass=org.apache.beam.dsls.sql.example.BeamSqlExample \
+ *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * </pre>
+ *
+ */
+class BeamSqlExample {
+  public static void main(String[] args) throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+    Pipeline p = Pipeline.create(options);
+
+    //define the input row format
+    List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+    List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+    BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
+    BeamSqlRow row = new BeamSqlRow(type);
+    row.addField(0, 1);
+    row.addField(1, "row");
+    row.addField(2, 1.0);
+
+    //create a source PCollection with Create.of();
+    PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
+        .withCoder(new BeamSqlRowCoder(type)));
+
+    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+    PCollection<BeamSqlRow> outputStream = inputTable.apply(
+        BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+
+    //print the output record of case 1;
+    outputStream.apply("log_result",
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+      public Void apply(BeamSqlRow input) {
+        System.out.println("PCOLLECTION: " + input);
+        return null;
+      }
+    }));
+
+    //Case 2. run the query with BeamSql.query over result PCollection of case 1.
+    PCollection<BeamSqlRow> outputStream2 =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+
+    //print the output record of case 2;
+    outputStream2.apply("log_result",
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+      @Override
+      public Void apply(BeamSqlRow input) {
+        System.out.println("TABLE_B: " + input);
+        return null;
+      }
+    }));
+
+    p.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
new file mode 100644
index 0000000..52a9fce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * examples on how to use BeamSQL.
+ *
+ */
+package org.apache.beam.dsls.sql.example;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..3732933
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+  /**
+   * invoked before data processing.
+   */
+  void prepare();
+
+  /**
+   * apply transformation to input record {@link BeamSqlRow}.
+   *
+   */
+  List<Object> execute(BeamSqlRow inputRow);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..aee0e4a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+  protected List<BeamSqlExpression> exps;
+
+  public BeamSqlFnExecutor(BeamRelNode relNode) {
+    this.exps = new ArrayList<>();
+    if (relNode instanceof BeamFilterRel) {
+      BeamFilterRel filterNode = (BeamFilterRel) relNode;
+      RexNode condition = filterNode.getCondition();
+      exps.add(buildExpression(condition));
+    } else if (relNode instanceof BeamProjectRel) {
+      BeamProjectRel projectNode = (BeamProjectRel) relNode;
+      List<RexNode> projects = projectNode.getProjects();
+      for (RexNode rexNode : projects) {
+        exps.add(buildExpression(rexNode));
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", relNode.getClass().toString()));
+    }
+  }
+
+  /**
+   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+   */
+  static BeamSqlExpression buildExpression(RexNode rexNode) {
+    BeamSqlExpression ret = null;
+    if (rexNode instanceof RexLiteral) {
+      RexLiteral node = (RexLiteral) rexNode;
+      SqlTypeName type = node.getTypeName();
+      Object value = node.getValue();
+
+      if (SqlTypeName.CHAR_TYPES.contains(type)
+          && node.getValue() instanceof NlsString) {
+        // NlsString is not serializable, we need to convert
+        // it to string explicitly.
+        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+        // does this actually make sense?
+        // Calcite actually treat Calendar as the java type of Date Literal
+        return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+      } else {
+        // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+        // e.g. sql: "select 1"
+        // here the literal 1 will be parsed as a RexLiteral where:
+        //     node.getType().getSqlTypeName() = INTEGER (the display type)
+        //     node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+        // So we need to do a convert here.
+        // check RexBuilder#makeLiteral for more information.
+        SqlTypeName realType = node.getType().getSqlTypeName();
+        Object realValue = value;
+        if (type == SqlTypeName.DECIMAL) {
+          BigDecimal rawValue = (BigDecimal) value;
+          switch (realType) {
+            case TINYINT:
+              realValue = (byte) rawValue.intValue();
+              break;
+            case SMALLINT:
+              realValue = (short) rawValue.intValue();
+              break;
+            case INTEGER:
+              realValue = rawValue.intValue();
+              break;
+            case BIGINT:
+              realValue = rawValue.longValue();
+              break;
+            case DECIMAL:
+              realValue = rawValue;
+              break;
+            default:
+              throw new IllegalStateException("type/realType mismatch: "
+                  + type + " VS " + realType);
+          }
+        } else if (type == SqlTypeName.DOUBLE) {
+          Double rawValue = (Double) value;
+          if (realType == SqlTypeName.FLOAT) {
+            realValue = rawValue.floatValue();
+          }
+        }
+        return BeamSqlPrimitive.of(realType, realValue);
+      }
+    } else if (rexNode instanceof RexInputRef) {
+      RexInputRef node = (RexInputRef) rexNode;
+      ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+    } else if (rexNode instanceof RexCall) {
+      RexCall node = (RexCall) rexNode;
+      String opName = node.op.getName();
+      List<BeamSqlExpression> subExps = new ArrayList<>();
+      for (RexNode subNode : node.getOperands()) {
+        subExps.add(buildExpression(subNode));
+      }
+      switch (opName) {
+        // logical operators
+        case "AND":
+          ret = new BeamSqlAndExpression(subExps);
+          break;
+        case "OR":
+          ret = new BeamSqlOrExpression(subExps);
+          break;
+        case "NOT":
+          ret = new BeamSqlNotExpression(subExps);
+          break;
+        case "=":
+          ret = new BeamSqlEqualsExpression(subExps);
+          break;
+        case "<>":
+          ret = new BeamSqlNotEqualsExpression(subExps);
+          break;
+        case ">":
+          ret = new BeamSqlGreaterThanExpression(subExps);
+          break;
+        case ">=":
+          ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+          break;
+        case "<":
+          ret = new BeamSqlLessThanExpression(subExps);
+          break;
+        case "<=":
+          ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+          break;
+
+        // arithmetic operators
+        case "+":
+          ret = new BeamSqlPlusExpression(subExps);
+          break;
+        case "-":
+          ret = new BeamSqlMinusExpression(subExps);
+          break;
+        case "*":
+          ret = new BeamSqlMultiplyExpression(subExps);
+          break;
+        case "/":
+        case "/INT":
+          ret = new BeamSqlDivideExpression(subExps);
+          break;
+        case "MOD":
+          ret = new BeamSqlModExpression(subExps);
+          break;
+
+        case "ABS":
+          ret = new BeamSqlAbsExpression(subExps);
+          break;
+        case "ROUND":
+          ret = new BeamSqlRoundExpression(subExps);
+          break;
+        case "LN":
+          ret = new BeamSqlLnExpression(subExps);
+          break;
+        case "LOG10":
+          ret = new BeamSqlLogExpression(subExps);
+          break;
+        case "EXP":
+          ret = new BeamSqlExpExpression(subExps);
+          break;
+        case "ACOS":
+          ret = new BeamSqlAcosExpression(subExps);
+          break;
+        case "ASIN":
+          ret = new BeamSqlAsinExpression(subExps);
+          break;
+        case "ATAN":
+          ret = new BeamSqlAtanExpression(subExps);
+          break;
+        case "COT":
+          ret = new BeamSqlCotExpression(subExps);
+          break;
+        case "DEGREES":
+          ret = new BeamSqlDegreesExpression(subExps);
+          break;
+        case "RADIANS":
+          ret = new BeamSqlRadiansExpression(subExps);
+          break;
+        case "COS":
+          ret = new BeamSqlCosExpression(subExps);
+          break;
+        case "SIN":
+          ret = new BeamSqlSinExpression(subExps);
+          break;
+        case "TAN":
+          ret = new BeamSqlTanExpression(subExps);
+          break;
+        case "SIGN":
+          ret = new BeamSqlSignExpression(subExps);
+          break;
+        case "POWER":
+          ret = new BeamSqlPowerExpression(subExps);
+          break;
+        case "PI":
+          ret = new BeamSqlPiExpression();
+          break;
+        case "ATAN2":
+          ret = new BeamSqlAtan2Expression(subExps);
+          break;
+        case "TRUNCATE":
+          ret = new BeamSqlTruncateExpression(subExps);
+          break;
+        case "RAND":
+          ret = new BeamSqlRandExpression(subExps);
+          break;
+        case "RAND_INTEGER":
+          ret = new BeamSqlRandIntegerExpression(subExps);
+          break;
+
+        // string operators
+        case "||":
+          ret = new BeamSqlConcatExpression(subExps);
+          break;
+        case "POSITION":
+          ret = new BeamSqlPositionExpression(subExps);
+          break;
+        case "CHAR_LENGTH":
+        case "CHARACTER_LENGTH":
+          ret = new BeamSqlCharLengthExpression(subExps);
+          break;
+        case "UPPER":
+          ret = new BeamSqlUpperExpression(subExps);
+          break;
+        case "LOWER":
+          ret = new BeamSqlLowerExpression(subExps);
+          break;
+        case "TRIM":
+          ret = new BeamSqlTrimExpression(subExps);
+          break;
+        case "SUBSTRING":
+          ret = new BeamSqlSubstringExpression(subExps);
+          break;
+        case "OVERLAY":
+          ret = new BeamSqlOverlayExpression(subExps);
+          break;
+        case "INITCAP":
+          ret = new BeamSqlInitCapExpression(subExps);
+          break;
+
+        // date functions
+        case "Reinterpret":
+          return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+        case "CEIL":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlCeilExpression(subExps);
+          } else {
+            return new BeamSqlDateCeilExpression(subExps);
+          }
+        case "FLOOR":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlFloorExpression(subExps);
+          } else {
+            return new BeamSqlDateFloorExpression(subExps);
+          }
+        case "EXTRACT_DATE":
+        case "EXTRACT":
+          return new BeamSqlExtractExpression(subExps);
+
+        case "LOCALTIME":
+        case "CURRENT_TIME":
+          return new BeamSqlCurrentTimeExpression(subExps);
+
+        case "CURRENT_TIMESTAMP":
+        case "LOCALTIMESTAMP":
+          return new BeamSqlCurrentTimestampExpression(subExps);
+
+        case "CURRENT_DATE":
+          return new BeamSqlCurrentDateExpression();
+
+
+        case "CASE":
+          ret = new BeamSqlCaseExpression(subExps);
+          break;
+        case "CAST":
+          ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+          break;
+
+        case "IS NULL":
+          ret = new BeamSqlIsNullExpression(subExps.get(0));
+          break;
+        case "IS NOT NULL":
+          ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+          break;
+
+        case "HOP":
+        case "TUMBLE":
+        case "SESSION":
+          ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+          break;
+        case "HOP_START":
+        case "TUMBLE_START":
+        case "SESSION_START":
+          ret = new BeamSqlWindowStartExpression();
+          break;
+        case "HOP_END":
+        case "TUMBLE_END":
+        case "SESSION_END":
+          ret = new BeamSqlWindowEndExpression();
+          break;
+        default:
+          //handle UDF
+          if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+            SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+            ret = new BeamSqlUdfExpression(fn.method, subExps,
+              ((RexCall) rexNode).type.getSqlTypeName());
+        } else {
+          throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+        }
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", rexNode.getClass().toString()));
+    }
+
+    if (ret != null && !ret.accept()) {
+      throw new IllegalStateException(ret.getClass().getSimpleName()
+          + " does not accept the operands.(" + rexNode + ")");
+    }
+
+    return ret;
+  }
+
+  @Override
+  public void prepare() {
+  }
+
+  @Override
+  public List<Object> execute(BeamSqlRow inputRow) {
+    List<Object> results = new ArrayList<>();
+    for (BeamSqlExpression exp : exps) {
+      results.add(exp.evaluate(inputRow).getValue());
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..a30916b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ *  {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+  public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+    // the return type of CASE is the type of the `else` condition
+    super(operands, operands.get(operands.size() - 1).getOutputType());
+  }
+
+  @Override public boolean accept() {
+    // `when`-`then` pair + `else`
+    if (operands.size() % 2 != 1) {
+      return false;
+    }
+
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opType(i) != SqlTypeName.BOOLEAN) {
+        return false;
+      } else if (opType(i + 1) != outputType) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opValueEvaluated(i, inputRow)) {
+        return BeamSqlPrimitive.of(
+            outputType,
+            opValueEvaluated(i + 1, inputRow)
+        );
+      }
+    }
+    return BeamSqlPrimitive.of(outputType,
+        opValueEvaluated(operands.size() - 1, inputRow));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..524d1df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+  private static final int index = 0;
+  private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+  private static final String outputDateFormat = "yyyy-MM-dd";
+  /**
+   * Date and Timestamp formats used to parse
+   * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+   */
+  private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+      .append(null/*printer*/, new DateTimeParser[] {
+          // date formats
+          DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+          DateTimeFormat.forPattern("yyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+          // datetime formats
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+      .withPivotYear(2020);
+
+  public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+    super(operands, castType);
+  }
+
+  @Override
+  public boolean accept() {
+    return numberOfOperands() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    SqlTypeName castOutputType = getOutputType();
+    switch (castOutputType) {
+      case INTEGER:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+      case DOUBLE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+      case SMALLINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+      case TINYINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+      case BIGINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+      case FLOAT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+      case CHAR:
+      case VARCHAR:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+      case DATE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+      case TIMESTAMP:
+        return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+    }
+    throw new UnsupportedOperationException(
+        String.format("Cast to type %s not supported", castOutputType));
+  }
+
+  private Date toDate(Object inputDate, String outputFormat) {
+    try {
+      return Date
+          .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+    }
+  }
+
+  private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+    try {
+      return Timestamp.valueOf(
+          dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+              .roundCeilingCopy().toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+    }
+  }
+}