You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:49 UTC

[26/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
deleted file mode 100644
index 5d5d4fc..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.fun.SqlAvgAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.type.BasicSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link BeamAggregationTransforms}.
- *
- */
-public class BeamAggregationTransformTest extends BeamTransformBaseTest{
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  private List<AggregateCall> aggCalls;
-
-  private BeamSqlRowType keyType;
-  private BeamSqlRowType aggPartType;
-  private BeamSqlRowType outputType;
-
-  private BeamSqlRowCoder inRecordCoder;
-  private BeamSqlRowCoder keyCoder;
-  private BeamSqlRowCoder aggCoder;
-  private BeamSqlRowCoder outRecordCoder;
-
-  /**
-   * This step equals to below query.
-   * <pre>
-   * SELECT `f_int`
-   * , COUNT(*) AS `size`
-   * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1`
-   * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1`
-   * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2`
-   * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2`
-   * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3`
-   * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3`
-   * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4`
-   * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4`
-   * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5`
-   * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5`
-   * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7`
-   * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8`
-   * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8`
-   * FROM TABLE_NAME
-   * GROUP BY `f_int`
-   * </pre>
-   * @throws ParseException
-   */
-  @Test
-  public void testCountPerElementBasic() throws ParseException {
-    setupEnvironment();
-
-    PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows));
-
-    //1. extract fields in group-by key part
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy",
-        WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder));
-
-    //2. apply a GroupByKey.
-    PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
-        .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
-            IterableCoder.<BeamSqlRow>of(inRecordCoder)));
-
-    //3. run aggregation functions
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
-            new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType)))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
-
-    //4. flat KV to a single record
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
-    mergedStream.setCoder(outRecordCoder);
-
-    //assert function BeamAggregationTransform.AggregationGroupByKeyFn
-    PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
-
-    //assert BeamAggregationTransform.AggregationCombineFn
-    PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
-
-  //assert BeamAggregationTransform.MergeAggregationRecord
-    PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord());
-
-    p.run();
-}
-
-  private void setupEnvironment() {
-    prepareAggregationCalls();
-    prepareTypeAndCoder();
-  }
-
-  /**
-   * create list of all {@link AggregateCall}.
-   */
-  @SuppressWarnings("deprecation")
-  private void prepareAggregationCalls() {
-    //aggregations for all data type
-    aggCalls = new ArrayList<>();
-    aggCalls.add(
-        new AggregateCall(new SqlCountAggFunction(), false,
-            Arrays.<Integer>asList(),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "count")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "sum1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "avg1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "max1")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(1),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT),
-            "min1")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlSumAggFunction(
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "sum2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "avg2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "max2")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(2),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT),
-            "min2")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)),
-            false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "sum3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "avg3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "max3")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(3),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT),
-            "min3")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)),
-            false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "sum4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "avg4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "max4")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(4),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT),
-            "min4")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)),
-            false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "sum5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "avg5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "max5")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(5),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE),
-            "min5")
-        );
-
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "max7")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(7),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP),
-            "min7")
-        );
-
-    aggCalls.add(
-        new AggregateCall(
-            new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)),
-            false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "sum8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "avg8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "max8")
-        );
-    aggCalls.add(
-        new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false,
-            Arrays.<Integer>asList(8),
-            new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER),
-            "min8")
-        );
-  }
-
-  /**
-   * Coders used in aggregation steps.
-   */
-  private void prepareTypeAndCoder() {
-    inRecordCoder = new BeamSqlRowCoder(inputRowType);
-
-    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
-    keyCoder = new BeamSqlRowCoder(keyType);
-
-    aggPartType = initTypeOfSqlRow(
-        Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
-
-            KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-            KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-            KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-            KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-            KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-            KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-            KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-            KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-            KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-            KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-            KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-            KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-            KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-            ));
-    aggCoder = new BeamSqlRowCoder(aggPartType);
-
-    outputType = prepareFinalRowType();
-    outRecordCoder = new BeamSqlRowCoder(outputType);
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
-   */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() {
-    return Arrays.asList(
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-            inputRows.get(0)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
-            inputRows.get(1)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
-            inputRows.get(2)),
-        KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
-            inputRows.get(3)));
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
-   */
-  private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn()
-      throws ParseException {
-    return Arrays.asList(
-            KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-                new BeamSqlRow(aggPartType, Arrays.<Object>asList(
-                    4L,
-                    10000L, 2500L, 4000L, 1000L,
-                    (short) 10, (short) 2, (short) 4, (short) 1,
-                    (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-                    10.0F, 2.5F, 4.0F, 1.0F,
-                    10.0, 2.5, 4.0, 1.0,
-                    format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-                    10, 2, 4, 1
-                    )))
-            );
-  }
-
-  /**
-   * Row type of final output row.
-   */
-  private BeamSqlRowType prepareFinalRowType() {
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    List<KV<String, SqlTypeName>> columnMetadata =
-        Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
-
-        KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
-        KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT),
-
-        KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT),
-        KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT),
-
-        KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT),
-        KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT),
-
-        KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT),
-        KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT),
-
-        KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE),
-        KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE),
-
-        KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP),
-
-        KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
-        KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
-        );
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
-   */
-  private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException {
-    return new BeamSqlRow(outputType, Arrays.<Object>asList(
-        1, 4L,
-        10000L, 2500L, 4000L, 1000L,
-        (short) 10, (short) 2, (short) 4, (short) 1,
-        (byte) 10, (byte) 2, (byte) 4, (byte) 1,
-        10.0F, 2.5F, 4.0F, 1.0F,
-        10.0, 2.5, 4.0, 1.0,
-        format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"),
-        10, 2, 4, 1
-        ));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
deleted file mode 100644
index 4045bc8..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.transform;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
-
-/**
- * shared methods to test PTransforms which execute Beam SQL steps.
- *
- */
-public class BeamTransformBaseTest {
-  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  public static BeamSqlRowType inputRowType;
-  public static List<BeamSqlRow> inputRows;
-
-  @BeforeClass
-  public static void prepareInput() throws NumberFormatException, ParseException{
-    List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(
-        KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT),
-        KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT),
-        KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE),
-        KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP),
-        KV.of("f_int2", SqlTypeName.INTEGER)
-        );
-    inputRowType = initTypeOfSqlRow(columnMetadata);
-    inputRows = Arrays.asList(
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0,
-                "string_row1", format.parse("2017-01-01 01:01:03"), 1)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0,
-                "string_row2", format.parse("2017-01-01 01:02:03"), 2)),
-        initBeamSqlRow(columnMetadata,
-            Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0,
-                "string_row3", format.parse("2017-01-01 01:03:03"), 3)),
-        initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"),
-            Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4)));
-  }
-
-  /**
-   * create a {@code BeamSqlRowType} for given column metadata.
-   */
-  public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){
-    FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
-    for (KV<String, SqlTypeName> cm : columnMetadata) {
-      builder.add(cm.getKey(), cm.getValue());
-    }
-    return CalciteUtils.toBeamRowType(builder.build());
-  }
-
-  /**
-   * Create an empty row with given column metadata.
-   */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) {
-    return initBeamSqlRow(columnMetadata, Arrays.asList());
-  }
-
-  /**
-   * Create a row with given column metadata, and values for each column.
-   *
-   */
-  public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata,
-      List<Object> rowValues){
-    BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
-
-    return new BeamSqlRow(rowType, rowValues);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
new file mode 100644
index 0000000..08678d1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Surface test for BeamSql api.
+ */
+@RunWith(JUnit4.class)
+public class BeamSqlApiSurfaceTest {
+  @Test
+  public void testSdkApiSurface() throws Exception {
+
+    @SuppressWarnings("unchecked")
+    final Set<String> allowed =
+        ImmutableSet.of(
+            "org.apache.beam",
+            "org.joda.time",
+            "org.apache.commons.csv");
+
+    ApiSurface surface = ApiSurface
+        .ofClass(BeamSqlCli.class)
+        .includingClass(BeamSql.class)
+        .includingClass(BeamSqlEnv.class)
+        .includingPackage("org.apache.beam.sdk.extensions.sql.schema",
+            getClass().getClassLoader())
+        .pruningPrefix("java")
+        .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test")
+        .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase");
+
+    assertThat(surface, containsOnlyPackages(allowed));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
new file mode 100644
index 0000000..e6ca18f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window
+ * with BOUNDED PCollection.
+ */
+public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
+  /**
+   * GROUP-BY with single aggregation function with bounded PCollection.
+   */
+  @Test
+  public void testAggregationWithoutWindowWithBounded() throws Exception {
+    runAggregationWithoutWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with single aggregation function with unbounded PCollection.
+   */
+  @Test
+  public void testAggregationWithoutWindowWithUnbounded() throws Exception {
+    runAggregationWithoutWindow(unboundedInput1);
+  }
+
+  private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+
+    PCollection<BeamSqlRow> result =
+        input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int2", 0);
+    record.addField("size", 4L);
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with multiple aggregation functions with bounded PCollection.
+   */
+  @Test
+  public void testAggregationFunctionsWithBounded() throws Exception{
+    runAggregationFunctions(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with multiple aggregation functions with unbounded PCollection.
+   */
+  @Test
+  public void testAggregationFunctionsWithUnbounded() throws Exception{
+    runAggregationFunctions(unboundedInput1);
+  }
+
+  private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
+    String sql = "select f_int2, count(*) as size, "
+        + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+        + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
+        + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3,"
+        + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4,"
+        + "sum(f_double) as sum5, avg(f_double) as avg5, "
+        + "max(f_double) as max5, min(f_double) as min5,"
+        + "max(f_timestamp) as max6, min(f_timestamp) as min6 "
+        + "FROM TABLE_A group by f_int2";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testAggregationFunctions", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(
+        Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
+            "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
+            "max5", "min5", "max6", "min6"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
+            Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+            Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT,
+            Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+            Types.TIMESTAMP, Types.TIMESTAMP));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int2", 0);
+    record.addField("size", 4L);
+
+    record.addField("sum1", 10000L);
+    record.addField("avg1", 2500L);
+    record.addField("max1", 4000L);
+    record.addField("min1", 1000L);
+
+    record.addField("sum2", (short) 10);
+    record.addField("avg2", (short) 2);
+    record.addField("max2", (short) 4);
+    record.addField("min2", (short) 1);
+
+    record.addField("sum3", (byte) 10);
+    record.addField("avg3", (byte) 2);
+    record.addField("max3", (byte) 4);
+    record.addField("min3", (byte) 1);
+
+    record.addField("sum4", 10.0F);
+    record.addField("avg4", 2.5F);
+    record.addField("max4", 4.0F);
+    record.addField("min4", 1.0F);
+
+    record.addField("sum5", 10.0);
+    record.addField("avg5", 2.5);
+    record.addField("max5", 4.0);
+    record.addField("min5", 1.0);
+
+    record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
+    record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Implicit GROUP-BY with DISTINCT with bounded PCollection.
+   */
+  @Test
+  public void testDistinctWithBounded() throws Exception {
+    runDistinct(boundedInput1);
+  }
+
+  /**
+   * Implicit GROUP-BY with DISTINCT with unbounded PCollection.
+   */
+  @Test
+  public void testDistinctWithUnbounded() throws Exception {
+    runDistinct(unboundedInput1);
+  }
+
+  private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
+
+    PCollection<BeamSqlRow> result =
+        input.apply("testDistinct", BeamSql.simpleQuery(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", 1);
+    record1.addField("f_long", 1000L);
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", 2);
+    record2.addField("f_long", 2000L);
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", 3);
+    record3.addField("f_long", 3000L);
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", 4);
+    record4.addField("f_long", 4000L);
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection.
+   */
+  @Test
+  public void testTumbleWindowWithBounded() throws Exception {
+    runTumbleWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection.
+   */
+  @Test
+  public void testTumbleWindowWithUnbounded() throws Exception {
+    runTumbleWindow(unboundedInput1);
+  }
+
+  private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+        + " FROM TABLE_A"
+        + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testTumbleWindow", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 1L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection.
+   */
+  @Test
+  public void testHopWindowWithBounded() throws Exception {
+    runHopWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection.
+   */
+  @Test
+  public void testHopWindowWithUnbounded() throws Exception {
+    runHopWindow(unboundedInput1);
+  }
+
+  private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+        + " FROM PCOLLECTION"
+        + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+    PCollection<BeamSqlRow> result =
+        input.apply("testHopWindow", BeamSql.simpleQuery(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 3L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int2", 0);
+    record3.addField("size", 1L);
+    record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
+    record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+    record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int2", 0);
+    record4.addField("size", 1L);
+    record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+    record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+    record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with SESSION window with bounded PCollection.
+   */
+  @Test
+  public void testSessionWindowWithBounded() throws Exception {
+    runSessionWindow(boundedInput1);
+  }
+
+  /**
+   * GROUP-BY with SESSION window with unbounded PCollection.
+   */
+  @Test
+  public void testSessionWindowWithUnbounded() throws Exception {
+    runSessionWindow(unboundedInput1);
+  }
+
+  private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+        + " FROM TABLE_A"
+        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testSessionWindow", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(
+        Arrays.asList("f_int2", "size", "window_start"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 1L);
+    record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWindowOnNonTimestampField() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage(
+        "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+        + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+        .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testUnsupportedDistinct() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Encountered \"*\"");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+
+    PCollection<BeamSqlRow> result =
+        boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
new file mode 100644
index 0000000..0c1ce1c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+/**
+ * prepare input records to test {@link BeamSql}.
+ *
+ * <p>Note that, any change in these records would impact tests in this package.
+ *
+ */
+public class BeamSqlDslBase {
+  public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
+
+  public static BeamSqlRowType rowTypeInTableA;
+  public static List<BeamSqlRow> recordsInTableA;
+
+  //bounded PCollections
+  public PCollection<BeamSqlRow> boundedInput1;
+  public PCollection<BeamSqlRow> boundedInput2;
+
+  //unbounded PCollections
+  public PCollection<BeamSqlRow> unboundedInput1;
+  public PCollection<BeamSqlRow> unboundedInput2;
+
+  @BeforeClass
+  public static void prepareClass() throws ParseException {
+    rowTypeInTableA = BeamSqlRowType.create(
+        Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
+            "f_timestamp", "f_int2", "f_decimal"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
+            Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL));
+
+    recordsInTableA = prepareInputRowsInTableA();
+  }
+
+  @Before
+  public void preparePCollections(){
+    boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
+        Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+
+    boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
+        Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+
+    unboundedInput1 = prepareUnboundedPCollection1();
+    unboundedInput2 = prepareUnboundedPCollection2();
+  }
+
+  private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
+    TestStream.Builder<BeamSqlRow> values = TestStream
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
+
+    for (BeamSqlRow row : recordsInTableA) {
+      values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+      values = values.addElements(row);
+    }
+
+    return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
+  }
+
+  private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
+    TestStream.Builder<BeamSqlRow> values = TestStream
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
+
+    BeamSqlRow row = recordsInTableA.get(0);
+    values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+    values = values.addElements(row);
+
+    return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
+  }
+
+  private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
+    List<BeamSqlRow> rows = new ArrayList<>();
+
+    BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
+    row1.addField(0, 1);
+    row1.addField(1, 1000L);
+    row1.addField(2, Short.valueOf("1"));
+    row1.addField(3, Byte.valueOf("1"));
+    row1.addField(4, 1.0f);
+    row1.addField(5, 1.0);
+    row1.addField(6, "string_row1");
+    row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
+    row1.addField(8, 0);
+    row1.addField(9, new BigDecimal(1));
+    rows.add(row1);
+
+    BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
+    row2.addField(0, 2);
+    row2.addField(1, 2000L);
+    row2.addField(2, Short.valueOf("2"));
+    row2.addField(3, Byte.valueOf("2"));
+    row2.addField(4, 2.0f);
+    row2.addField(5, 2.0);
+    row2.addField(6, "string_row2");
+    row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
+    row2.addField(8, 0);
+    row2.addField(9, new BigDecimal(2));
+    rows.add(row2);
+
+    BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
+    row3.addField(0, 3);
+    row3.addField(1, 3000L);
+    row3.addField(2, Short.valueOf("3"));
+    row3.addField(3, Byte.valueOf("3"));
+    row3.addField(4, 3.0f);
+    row3.addField(5, 3.0);
+    row3.addField(6, "string_row3");
+    row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
+    row3.addField(8, 0);
+    row3.addField(9, new BigDecimal(3));
+    rows.add(row3);
+
+    BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
+    row4.addField(0, 4);
+    row4.addField(1, 4000L);
+    row4.addField(2, Short.valueOf("4"));
+    row4.addField(3, Byte.valueOf("4"));
+    row4.addField(4, 4.0f);
+    row4.addField(5, 4.0);
+    row4.addField(6, "string_row4");
+    row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
+    row4.addField(8, 0);
+    row4.addField(9, new BigDecimal(4));
+    rows.add(row4);
+
+    return rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
new file mode 100644
index 0000000..16b6426
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for WHERE queries with BOUNDED PCollection.
+ */
+public class BeamSqlDslFilterTest extends BeamSqlDslBase {
+  /**
+   * single filter with bounded PCollection.
+   */
+  @Test
+  public void testSingleFilterWithBounded() throws Exception {
+    runSingleFilter(boundedInput1);
+  }
+
+  /**
+   * single filter with unbounded PCollection.
+   */
+  @Test
+  public void testSingleFilterWithUnbounded() throws Exception {
+    runSingleFilter(unboundedInput1);
+  }
+
+  private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
+
+    PCollection<BeamSqlRow> result =
+        input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * composite filters with bounded PCollection.
+   */
+  @Test
+  public void testCompositeFilterWithBounded() throws Exception {
+    runCompositeFilter(boundedInput1);
+  }
+
+  /**
+   * composite filters with unbounded PCollection.
+   */
+  @Test
+  public void testCompositeFilterWithUnbounded() throws Exception {
+    runCompositeFilter(unboundedInput1);
+  }
+
+  private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT * FROM TABLE_A"
+        + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testCompositeFilter", BeamSql.query(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * nothing return with filters in bounded PCollection.
+   */
+  @Test
+  public void testNoReturnFilterWithBounded() throws Exception {
+    runNoReturnFilter(boundedInput1);
+  }
+
+  /**
+   * nothing return with filters in unbounded PCollection.
+   */
+  @Test
+  public void testNoReturnFilterWithUnbounded() throws Exception {
+    runNoReturnFilter(unboundedInput1);
+  }
+
+  private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testNoReturnFilter", BeamSql.query(sql));
+
+    PAssert.that(result).empty();
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testFromInvalidTableName1() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Object 'TABLE_B' not found");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+        .apply("testFromInvalidTableName1", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testFromInvalidTableName2() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Use fixed table name PCOLLECTION");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM PCOLLECTION_NA";
+
+    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testInvalidFilter() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Column 'f_int_na' not found in any table");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
+
+    PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
new file mode 100644
index 0000000..363ab8f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for joins in queries.
+ */
+public class BeamSqlDslJoinTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  private static final BeamSqlRowType SOURCE_RECORD_TYPE =
+      BeamSqlRowType.create(
+          Arrays.asList(
+              "order_id", "site_id", "price"
+          ),
+          Arrays.asList(
+              Types.INTEGER, Types.INTEGER, Types.INTEGER
+          )
+      );
+
+  private static final BeamSqlRowCoder SOURCE_CODER =
+      new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
+
+  private static final BeamSqlRowType RESULT_RECORD_TYPE =
+      BeamSqlRowType.create(
+          Arrays.asList(
+          "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
+          ),
+          Arrays.asList(
+              Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER
+              , Types.INTEGER, Types.INTEGER
+          )
+      );
+
+  private static final BeamSqlRowCoder RESULT_CODER =
+      new BeamSqlRowCoder(RESULT_RECORD_TYPE);
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            1, 2, 3, null, null, null,
+            2, 3, 3, 1, 2, 3,
+            3, 4, 5, null, null, null
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " FULL OUTER JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+        TestUtils.RowsBuilder.of(
+            RESULT_RECORD_TYPE
+        ).addRows(
+            2, 3, 3, 1, 2, 3,
+            1, 2, 3, null, null, null,
+            3, 4, 5, null, null, null,
+            null, null, null, 2, 3, 3,
+            null, null, null, 3, 4, 5
+        ).getRows());
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testException_nonEqualJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1"
+            + " JOIN ORDER_DETAILS2 o2"
+            + " on "
+            + " o1.order_id>o2.site_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    queryFromOrderTables(sql);
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testException_crossJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    queryFromOrderTables(sql);
+    pipeline.run();
+  }
+
+  private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
+    return PCollectionTuple
+        .of(
+            new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
+            ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+        )
+        .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
+            ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+        ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
new file mode 100644
index 0000000..6468011
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for field-project in queries with BOUNDED PCollection.
+ */
+public class BeamSqlDslProjectTest extends BeamSqlDslBase {
+  /**
+   * select all fields with bounded PCollection.
+   */
+  @Test
+  public void testSelectAllWithBounded() throws Exception {
+    runSelectAll(boundedInput2);
+  }
+
+  /**
+   * select all fields with unbounded PCollection.
+   */
+  @Test
+  public void testSelectAllWithUnbounded() throws Exception {
+    runSelectAll(unboundedInput2);
+  }
+
+  private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT * FROM PCOLLECTION";
+
+    PCollection<BeamSqlRow> result =
+        input.apply("testSelectAll", BeamSql.simpleQuery(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields with bounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsWithBounded() throws Exception {
+    runPartialFields(boundedInput2);
+  }
+
+  /**
+   * select partial fields with unbounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsWithUnbounded() throws Exception {
+    runPartialFields(unboundedInput2);
+  }
+
+  private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testPartialFields", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields for multiple rows with bounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInMultipleRowWithBounded() throws Exception {
+    runPartialFieldsInMultipleRow(boundedInput1);
+  }
+
+  /**
+   * select partial fields for multiple rows with unbounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception {
+    runPartialFieldsInMultipleRow(unboundedInput1);
+  }
+
+  private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields with bounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInRowsWithBounded() throws Exception {
+    runPartialFieldsInRows(boundedInput1);
+  }
+
+  /**
+   * select partial fields with unbounded PCollection.
+   */
+  @Test
+  public void testPartialFieldsInRowsWithUnbounded() throws Exception {
+    runPartialFieldsInRows(unboundedInput1);
+  }
+
+  private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testPartialFieldsInRows", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select literal field with bounded PCollection.
+   */
+  @Test
+  public void testLiteralFieldWithBounded() throws Exception {
+    runLiteralField(boundedInput2);
+  }
+
+  /**
+   * select literal field with unbounded PCollection.
+   */
+  @Test
+  public void testLiteralFieldWithUnbounded() throws Exception {
+    runLiteralField(unboundedInput2);
+  }
+
+  public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
+    String sql = "SELECT 1 as literal_field FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+        .apply("testLiteralField", BeamSql.query(sql));
+
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
+        Arrays.asList(Types.INTEGER));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("literal_field", 1);
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testProjectUnknownField() throws Exception {
+    exceptions.expect(IllegalStateException.class);
+    exceptions.expectMessage("Column 'f_int_na' not found in any table");
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    String sql = "SELECT f_int_na FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+        .apply("testProjectUnknownField", BeamSql.query(sql));
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
new file mode 100644
index 0000000..46cab09
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for UDF/UDAF.
+ */
+public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
+  /**
+   * GROUP-BY with UDAF.
+   */
+  @Test
+  public void testUdaf() throws Exception {
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
+        Arrays.asList(Types.INTEGER, Types.INTEGER));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int2", 0);
+    record.addField("squaresum", 30);
+
+    String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
+        + " FROM PCOLLECTION GROUP BY f_int2";
+    PCollection<BeamSqlRow> result1 =
+        boundedInput1.apply("testUdaf1",
+            BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class));
+    PAssert.that(result1).containsInAnyOrder(record);
+
+    String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`"
+        + " FROM PCOLLECTION GROUP BY f_int2";
+    PCollection<BeamSqlRow> result2 =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+        .apply("testUdaf2",
+            BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class));
+    PAssert.that(result2).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * test UDF.
+   */
+  @Test
+  public void testUdf() throws Exception{
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
+        Arrays.asList(Types.INTEGER, Types.INTEGER));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int", 2);
+    record.addField("cubicvalue", 8);
+
+    String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<BeamSqlRow> result1 =
+        boundedInput1.apply("testUdf1",
+            BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
+    PAssert.that(result1).containsInAnyOrder(record);
+
+    String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<BeamSqlRow> result2 =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+        .apply("testUdf2",
+            BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
+    PAssert.that(result2).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * UDAF for test, which returns the sum of square.
+   */
+  public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
+
+    public SquareSum() {
+    }
+
+    @Override
+    public Integer init() {
+      return 0;
+    }
+
+    @Override
+    public Integer add(Integer accumulator, Integer input) {
+      return accumulator + input * input;
+    }
+
+    @Override
+    public Integer merge(Iterable<Integer> accumulators) {
+      int v = 0;
+      Iterator<Integer> ite = accumulators.iterator();
+      while (ite.hasNext()) {
+        v += ite.next();
+      }
+      return v;
+    }
+
+    @Override
+    public Integer result(Integer accumulator) {
+      return accumulator;
+    }
+
+  }
+
+  /**
+   * A example UDF for test.
+   */
+  public static class CubicInteger implements BeamSqlUdf {
+    public static Integer eval(Integer input){
+      return input * input * input;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
new file mode 100644
index 0000000..9995b0a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Test utilities.
+ */
+public class TestUtils {
+  /**
+   * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
+   */
+  public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().valueInString());
+    }
+  }
+
+  /**
+   * Convert list of {@code BeamSqlRow} to list of {@code String}.
+   */
+  public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+    List<String> strs = new ArrayList<>();
+    for (BeamSqlRow row : rows) {
+      strs.add(row.valueInString());
+    }
+
+    return strs;
+  }
+
+  /**
+   * Convenient way to build a list of {@code BeamSqlRow}s.
+   *
+   * <p>You can use it like this:
+   *
+   * <pre>{@code
+   * TestUtils.RowsBuilder.of(
+   *   Types.INTEGER, "order_id",
+   *   Types.INTEGER, "sum_site_id",
+   *   Types.VARCHAR, "buyer"
+   * ).addRows(
+   *   1, 3, "james",
+   *   2, 5, "bond"
+   *   ).getStringRows()
+   * }</pre>
+   * {@code}
+   */
+  public static class RowsBuilder {
+    private BeamSqlRowType type;
+    private List<BeamSqlRow> rows = new ArrayList<>();
+
+    /**
+     * Create a RowsBuilder with the specified row type info.
+     *
+     * <p>For example:
+     * <pre>{@code
+     * TestUtils.RowsBuilder.of(
+     *   Types.INTEGER, "order_id",
+     *   Types.INTEGER, "sum_site_id",
+     *   Types.VARCHAR, "buyer"
+     * )}</pre>
+     *
+     * @args pairs of column type and column names.
+     */
+    public static RowsBuilder of(final Object... args) {
+      BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
+      RowsBuilder builder = new RowsBuilder();
+      builder.type = beamSQLRowType;
+
+      return builder;
+    }
+
+    /**
+     * Create a RowsBuilder with the specified row type info.
+     *
+     * <p>For example:
+     * <pre>{@code
+     * TestUtils.RowsBuilder.of(
+     *   beamSqlRowType
+     * )}</pre>
+     * @beamSQLRowType the record type.
+     */
+    public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
+      RowsBuilder builder = new RowsBuilder();
+      builder.type = beamSQLRowType;
+
+      return builder;
+    }
+
+    /**
+     * Add rows to the builder.
+     *
+     * <p>Note: check the class javadoc for for detailed example.
+     */
+    public RowsBuilder addRows(final Object... args) {
+      this.rows.addAll(buildRows(type, Arrays.asList(args)));
+      return this;
+    }
+
+    /**
+     * Add rows to the builder.
+     *
+     * <p>Note: check the class javadoc for for detailed example.
+     */
+    public RowsBuilder addRows(final List args) {
+      this.rows.addAll(buildRows(type, args));
+      return this;
+    }
+
+    public List<BeamSqlRow> getRows() {
+      return rows;
+    }
+
+    public List<String> getStringRows() {
+      return beamSqlRows2Strings(rows);
+    }
+  }
+
+  /**
+   * Convenient way to build a {@code BeamSqlRowType}.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   *   buildBeamSqlRowType(
+   *       Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time"
+   *   )
+   * }</pre>
+   */
+  public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
+    List<Integer> types = new ArrayList<>();
+    List<String> names = new ArrayList<>();
+
+    for (int i = 0; i < args.length - 1; i += 2) {
+      types.add((int) args[i]);
+      names.add((String) args[i + 1]);
+    }
+
+    return BeamSqlRowType.create(names, types);
+  }
+
+  /**
+   * Convenient way to build a {@code BeamSqlRow}s.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   *   buildRows(
+   *       rowType,
+   *       1, 1, 1, // the first row
+   *       2, 2, 2, // the second row
+   *       ...
+   *   )
+   * }</pre>
+   */
+  public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
+    List<BeamSqlRow> rows = new ArrayList<>();
+    int fieldCount = type.size();
+
+    for (int i = 0; i < args.size(); i += fieldCount) {
+      BeamSqlRow row = new BeamSqlRow(type);
+      for (int j = 0; j < fieldCount; j++) {
+        row.addField(j, args.get(i + j));
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
new file mode 100644
index 0000000..5e626a2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import org.junit.Test;
+
+/**
+ * Integration test for arithmetic operators.
+ */
+public class BeamSqlArithmeticOperatorsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+  private static final BigDecimal ZERO = BigDecimal.valueOf(0.0);
+  private static final BigDecimal ONE0 = BigDecimal.valueOf(1);
+  private static final BigDecimal ONE = BigDecimal.valueOf(1.0);
+  private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0));
+  private static final BigDecimal ONE10 = BigDecimal.ONE.divide(
+      BigDecimal.ONE, 10, RoundingMode.HALF_EVEN);
+  private static final BigDecimal TWO = BigDecimal.valueOf(2.0);
+
+  @Test
+  public void testPlus() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 + 1", 2)
+        .addExpr("1.0 + 1", TWO)
+        .addExpr("1 + 1.0", TWO)
+        .addExpr("1.0 + 1.0", TWO)
+        .addExpr("c_tinyint + c_tinyint", (byte) 2)
+        .addExpr("c_smallint + c_smallint", (short) 2)
+        .addExpr("c_bigint + c_bigint", 2L)
+        .addExpr("c_decimal + c_decimal", TWO)
+        .addExpr("c_tinyint + c_decimal", TWO)
+        .addExpr("c_float + c_decimal", 2.0)
+        .addExpr("c_double + c_decimal", 2.0)
+        .addExpr("c_float + c_float", 2.0f)
+        .addExpr("c_double + c_float", 2.0)
+        .addExpr("c_double + c_double", 2.0)
+        .addExpr("c_float + c_bigint", 2.0f)
+        .addExpr("c_double + c_bigint", 2.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testPlus_overflow() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2)
+        .addExpr("c_smallint_max + c_smallint_max", (short) -2)
+        .addExpr("c_integer_max + c_integer_max", -2)
+        // yeah, I know 384L is strange, but since it is already overflowed
+        // what the actualy result is not so important, it is wrong any way.
+        .addExpr("c_bigint_max + c_bigint_max", 384L)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMinus() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 - 1", 0)
+        .addExpr("1.0 - 1", ZERO)
+        .addExpr("1 - 0.0", ONE)
+        .addExpr("1.0 - 1.0", ZERO)
+        .addExpr("c_tinyint - c_tinyint", (byte) 0)
+        .addExpr("c_smallint - c_smallint", (short) 0)
+        .addExpr("c_bigint - c_bigint", 0L)
+        .addExpr("c_decimal - c_decimal", ZERO)
+        .addExpr("c_tinyint - c_decimal", ZERO)
+        .addExpr("c_float - c_decimal", 0.0)
+        .addExpr("c_double - c_decimal", 0.0)
+        .addExpr("c_float - c_float", 0.0f)
+        .addExpr("c_double - c_float", 0.0)
+        .addExpr("c_double - c_double", 0.0)
+        .addExpr("c_float - c_bigint", 0.0f)
+        .addExpr("c_double - c_bigint", 0.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMultiply() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 * 1", 1)
+        .addExpr("1.0 * 1", ONE2)
+        .addExpr("1 * 1.0", ONE2)
+        .addExpr("1.0 * 1.0", ONE2)
+        .addExpr("c_tinyint * c_tinyint", (byte) 1)
+        .addExpr("c_smallint * c_smallint", (short) 1)
+        .addExpr("c_bigint * c_bigint", 1L)
+        .addExpr("c_decimal * c_decimal", ONE2)
+        .addExpr("c_tinyint * c_decimal", ONE2)
+        .addExpr("c_float * c_decimal", 1.0)
+        .addExpr("c_double * c_decimal", 1.0)
+        .addExpr("c_float * c_float", 1.0f)
+        .addExpr("c_double * c_float", 1.0)
+        .addExpr("c_double * c_double", 1.0)
+        .addExpr("c_float * c_bigint", 1.0f)
+        .addExpr("c_double * c_bigint", 1.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testDivide() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 / 1", 1)
+        .addExpr("1.0 / 1", ONE10)
+        .addExpr("1 / 1.0", ONE10)
+        .addExpr("1.0 / 1.0", ONE10)
+        .addExpr("c_tinyint / c_tinyint", (byte) 1)
+        .addExpr("c_smallint / c_smallint", (short) 1)
+        .addExpr("c_bigint / c_bigint", 1L)
+        .addExpr("c_decimal / c_decimal", ONE10)
+        .addExpr("c_tinyint / c_decimal", ONE10)
+        .addExpr("c_float / c_decimal", 1.0)
+        .addExpr("c_double / c_decimal", 1.0)
+        .addExpr("c_float / c_float", 1.0f)
+        .addExpr("c_double / c_float", 1.0)
+        .addExpr("c_double / c_double", 1.0)
+        .addExpr("c_float / c_bigint", 1.0f)
+        .addExpr("c_double / c_bigint", 1.0)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testMod() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("mod(1, 1)", 0)
+        .addExpr("mod(1.0, 1)", 0)
+        .addExpr("mod(1, 1.0)", ZERO)
+        .addExpr("mod(1.0, 1.0)", ZERO)
+        .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0)
+        .addExpr("mod(c_smallint, c_smallint)", (short) 0)
+        .addExpr("mod(c_bigint, c_bigint)", 0L)
+        .addExpr("mod(c_decimal, c_decimal)", ZERO)
+        .addExpr("mod(c_tinyint, c_decimal)", ZERO)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+}