You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/17 01:30:02 UTC

[1/2] beam git commit: Update filter/project/aggregation tests to use BeamSql

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL dcd769c8a -> 738eb4dd0


Update filter/project/aggregation tests to use BeamSql


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86dea078
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86dea078
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86dea078

Branch: refs/heads/DSL_SQL
Commit: 86dea078eeb29ba92085dc6cd299aca00a23e7e9
Parents: dcd769c
Author: mingmxu <mi...@ebay.com>
Authored: Thu Jun 15 18:10:06 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 18:25:23 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 260 +++++++++++++++++++
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 125 +++++++++
 .../beam/dsls/sql/BeamSqlDslFilterTest.java     |  78 ++++++
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 163 ++++++++++++
 .../beam/dsls/sql/planner/BasePlanner.java      | 108 --------
 .../sql/planner/BeamGroupByExplainTest.java     | 106 --------
 .../sql/planner/BeamGroupByPipelineTest.java    | 111 --------
 .../sql/planner/BeamInvalidGroupByTest.java     |  51 ----
 .../BeamPlannerAggregationSubmitTest.java       | 152 -----------
 .../sql/planner/BeamPlannerExplainTest.java     |  67 -----
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  56 ----
 .../sql/schema/BeamPCollectionTableTest.java    |  73 ------
 12 files changed, 626 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
new file mode 100644
index 0000000..f7349c6
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window.
+ */
+public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
+  /**
+   * GROUP-BY with single aggregation function.
+   */
+  @Test
+  public void testAggregationWithoutWindow() throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2";
+
+    PCollection<BeamSqlRow> result =
+        inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int2", 0);
+    record.addField("size", 4L);
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with multiple aggregation functions.
+   */
+  @Test
+  public void testAggregationFunctions() throws Exception{
+    String sql = "select f_int2, count(*) as size, "
+        + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+        + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
+        + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3,"
+        + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4,"
+        + "sum(f_double) as sum5, avg(f_double) as avg5, "
+        + "max(f_double) as max5, min(f_double) as min5,"
+        + "max(f_timestamp) as max6, min(f_timestamp) as min6 "
+        + "FROM TABLE_A group by f_int2";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testAggregationFunctions", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+        Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
+            "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
+            "max5", "min5", "max6", "min6"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
+            Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+            Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT,
+            Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+            Types.TIMESTAMP, Types.TIMESTAMP));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int2", 0);
+    record.addField("size", 4L);
+
+    record.addField("sum1", 10000L);
+    record.addField("avg1", 2500L);
+    record.addField("max1", 4000L);
+    record.addField("min1", 1000L);
+
+    record.addField("sum2", (short) 10);
+    record.addField("avg2", (short) 2);
+    record.addField("max2", (short) 4);
+    record.addField("min2", (short) 1);
+
+    record.addField("sum3", (byte) 10);
+    record.addField("avg3", (byte) 2);
+    record.addField("max3", (byte) 4);
+    record.addField("min3", (byte) 1);
+
+    record.addField("sum4", 10.0F);
+    record.addField("avg4", 2.5F);
+    record.addField("max4", 4.0F);
+    record.addField("min4", 1.0F);
+
+    record.addField("sum5", 10.0);
+    record.addField("avg5", 2.5);
+    record.addField("max5", 4.0);
+    record.addField("min5", 1.0);
+
+    record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
+    record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Implicit GROUP-BY with DISTINCT.
+   */
+  @Test
+  public void testDistinct() throws Exception {
+    String sql = "SELECT distinct f_int, f_long FROM TABLE_A ";
+
+    PCollection<BeamSqlRow> result =
+        inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", 1);
+    record1.addField("f_long", 1000L);
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", 2);
+    record2.addField("f_long", 2000L);
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", 3);
+    record3.addField("f_long", 3000L);
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", 4);
+    record4.addField("f_long", 4000L);
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window(akka fix_time_window).
+   */
+  @Test
+  public void testTumbleWindow() throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+        + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testTumbleWindow", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 1L);
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with HOP window(akka sliding_window).
+   */
+  @Test
+  public void testHopWindow() throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+        + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+    PCollection<BeamSqlRow> result =
+        inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 3L);
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int2", 0);
+    record3.addField("size", 1L);
+    record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+    record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int2", 0);
+    record4.addField("size", 1L);
+    record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+    record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * GROUP-BY with SESSION window.
+   */
+  @Test
+  public void testSessionWindow() throws Exception {
+    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+        + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testSessionWindow", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int2", 0);
+    record1.addField("size", 3L);
+    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
+    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int2", 0);
+    record2.addField("size", 1L);
+    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
+    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2);
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
new file mode 100644
index 0000000..d62bdc4
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+
+/**
+ * prepare input records to test {@link BeamSql}.
+ *
+ * <p>Note that, any change in these records would impact tests in this package.
+ *
+ */
+public class BeamSqlDslBase {
+  public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  @ClassRule
+  public static TestPipeline pipeline = TestPipeline.create();
+
+  public static BeamSqlRecordType recordTypeInTableA;
+  public static List<BeamSqlRow> recordsInTableA;
+
+  public static PCollection<BeamSqlRow> inputA1;
+  public static PCollection<BeamSqlRow> inputA2;
+
+  @BeforeClass
+  public static void prepareClass() throws ParseException {
+    recordTypeInTableA = BeamSqlRecordType.create(
+        Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
+            "f_timestamp", "f_int2"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
+            Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER));
+
+    recordsInTableA = prepareInputRecordsInTableA();
+
+    inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA)
+        .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+
+    inputA2 = PBegin.in(pipeline).apply("inputA2", Create.of(recordsInTableA.get(0))
+        .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+  }
+
+  private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{
+    List<BeamSqlRow> rows = new ArrayList<>();
+
+    BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA);
+    row1.addField(0, 1);
+    row1.addField(1, 1000L);
+    row1.addField(2, Short.valueOf("1"));
+    row1.addField(3, Byte.valueOf("1"));
+    row1.addField(4, 1.0f);
+    row1.addField(5, 1.0);
+    row1.addField(6, "string_row1");
+    row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
+    row1.addField(8, 0);
+    rows.add(row1);
+
+    BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA);
+    row2.addField(0, 2);
+    row2.addField(1, 2000L);
+    row2.addField(2, Short.valueOf("2"));
+    row2.addField(3, Byte.valueOf("2"));
+    row2.addField(4, 2.0f);
+    row2.addField(5, 2.0);
+    row2.addField(6, "string_row2");
+    row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
+    row2.addField(8, 0);
+    rows.add(row2);
+
+    BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA);
+    row3.addField(0, 3);
+    row3.addField(1, 3000L);
+    row3.addField(2, Short.valueOf("3"));
+    row3.addField(3, Byte.valueOf("3"));
+    row3.addField(4, 3.0f);
+    row3.addField(5, 3.0);
+    row3.addField(6, "string_row3");
+    row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
+    row3.addField(8, 0);
+    rows.add(row3);
+
+    BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA);
+    row4.addField(0, 4);
+    row4.addField(1, 4000L);
+    row4.addField(2, Short.valueOf("4"));
+    row4.addField(3, Byte.valueOf("4"));
+    row4.addField(4, 4.0f);
+    row4.addField(5, 4.0);
+    row4.addField(6, "string_row4");
+    row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
+    row4.addField(8, 0);
+    rows.add(row4);
+
+    return rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
new file mode 100644
index 0000000..b68e526
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for WHERE queries.
+ */
+public class BeamSqlDslFilterTest extends BeamSqlDslBase {
+  /**
+   * single filter.
+   */
+  @Test
+  public void testSingleFilter() throws Exception {
+    String sql = "SELECT * FROM TABLE_A WHERE f_int = 1";
+
+    PCollection<BeamSqlRow> result =
+        inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * composite filters.
+   */
+  @Test
+  public void testCompositeFilter() throws Exception {
+    String sql = "SELECT * FROM TABLE_A"
+        + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testCompositeFilter", BeamSql.query(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * nothing return with filters.
+   */
+  @Test
+  public void testNoReturnFilter() throws Exception {
+    String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testNoReturnFilter", BeamSql.query(sql));
+
+    PAssert.that(result).empty();
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
new file mode 100644
index 0000000..2998682
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for field-project in queries.
+ */
+public class BeamSqlDslProjectTest extends BeamSqlDslBase {
+  /**
+   * select all fields.
+   */
+  @Test
+  public void testSelectAll() throws Exception {
+    String sql = "SELECT * FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
+
+    PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields.
+   */
+  @Test
+  public void testPartialFields() throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        .apply("testPartialFields", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields for multiple rows.
+   */
+  @Test
+  public void testPartialFieldsInMultipleRow() throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select partial fields.
+   */
+  @Test
+  public void testPartialFieldsInRows() throws Exception {
+    String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+        .apply("testPartialFieldsInRows", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+    BeamSqlRow record1 = new BeamSqlRow(resultType);
+    record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+    record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+    BeamSqlRow record2 = new BeamSqlRow(resultType);
+    record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+    record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+    BeamSqlRow record3 = new BeamSqlRow(resultType);
+    record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+    record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+    BeamSqlRow record4 = new BeamSqlRow(resultType);
+    record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+    record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * select literal field.
+   */
+  @Test
+  public void testLiteralField() throws Exception {
+    String sql = "SELECT 1 as literal_field FROM TABLE_A";
+
+    PCollection<BeamSqlRow> result =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+        .apply("testLiteralField", BeamSql.query(sql));
+
+    BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
+        Arrays.asList(Types.INTEGER));
+
+    BeamSqlRow record = new BeamSqlRow(resultType);
+    record.addField("literal_field", 1);
+
+    PAssert.that(result).containsInAnyOrder(record);
+
+    pipeline.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
deleted file mode 100644
index 2c5b555..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.BeforeClass;
-
-/**
- * prepare {@code BeamSqlRunner} for test.
- *
- */
-public class BasePlanner {
-  @BeforeClass
-  public static void prepareClass() {
-    registerTable("ORDER_DETAILS", getTable());
-    registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
-    registerTable("SUB_ORDER_RAM", getTable());
-  }
-
-  private static BaseBeamTable getTable() {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
-      }
-    };
-
-    BeamSqlRecordType dataType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-    BeamSqlRow row1 = new BeamSqlRow(dataType);
-    row1.addField(0, 12345L);
-    row1.addField(1, 0);
-    row1.addField(2, 10.5);
-    row1.addField(3, new Date());
-
-    BeamSqlRow row2 = new BeamSqlRow(dataType);
-    row2.addField(0, 12345L);
-    row2.addField(1, 1);
-    row2.addField(2, 20.5);
-    row2.addField(3, new Date());
-
-    BeamSqlRow row3 = new BeamSqlRow(dataType);
-    row3.addField(0, 12345L);
-    row3.addField(1, 0);
-    row3.addField(2, 20.5);
-    row3.addField(3, new Date());
-
-    BeamSqlRow row4 = new BeamSqlRow(dataType);
-    row4.addField(0, null);
-    row4.addField(1, null);
-    row4.addField(2, 20.5);
-    row4.addField(3, new Date());
-
-    return new MockedBeamSqlTable(dataType).withInputRecords(
-        Arrays.asList(row1, row2, row3, row4));
-  }
-
-  public static BaseBeamTable getTable(String bootstrapServer, String topic) {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
-      }
-    };
-
-    BeamSqlRecordType dataType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-
-    Map<String, Object> consumerPara = new HashMap<String, Object>();
-    consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
-    return new BeamKafkaCSVTable(dataType, bootstrapServer, Arrays.asList(topic))
-        .updateConsumerProperties(consumerPara);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
deleted file mode 100644
index 4ea0662..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
-import org.junit.Test;
-
-/**
- * Test group-by methods.
- *
- */
-public class BeamGroupByExplainTest extends BasePlanner {
-
-  /**
-   * GROUP-BY without window operation, and grouped fields.
-   */
-  @Test
-  public void testSimpleGroupExplain() throws Exception {
-    String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 ";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * GROUP-BY without window operation, and grouped fields.
-   */
-  @Test
-  public void testSimpleGroup2Explain() throws Exception {
-    String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window.
-   */
-  @Test
-  public void testTumbleExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window.
-   */
-  @Test
-  public void testTumbleWithDelayExplain() throws Exception {
-    String sql = "SELECT order_id, site_id, "
-        + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
-        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
-        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * GROUP-BY with HOP window.
-   */
-  @Test
-  public void testHopExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * GROUP-BY with SESSION window.
-   */
-  @Test
-  public void testSessionExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-
-  /**
-   * Query with UDF.
-   */
-  @Test
-  public void testUdf() throws Exception {
-    BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
-    String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
-
-    String plan = BeamSqlCli.explainQuery(sql);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
deleted file mode 100644
index 8db65d1..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Test group-by methods.
- *
- */
-public class BeamGroupByPipelineTest extends BasePlanner {
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  /**
-   * GROUP-BY without window operation, and grouped fields.
-   */
-  @Test
-  public void testSimpleGroupExplain() throws Exception {
-    String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 ";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * GROUP-BY without window operation, and grouped fields.
-   */
-  @Test
-  public void testSimpleGroup2Explain() throws Exception {
-    String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window.
-   */
-  @Test
-  public void testTumbleExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window.
-   */
-  @Test
-  public void testTumbleWithDelayExplain() throws Exception {
-    String sql = "SELECT order_id, site_id, "
-        + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
-        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
-        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * GROUP-BY with HOP window.
-   */
-  @Test
-  public void testHopExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * GROUP-BY with SESSION window.
-   */
-  @Test
-  public void testSessionExplain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
-        + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  /**
-   * Query with UDF.
-   */
-  @Test
-  public void testUdf() throws Exception {
-    BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
-    String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
-
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
deleted file mode 100644
index adb454c..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.tools.ValidationException;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test group-by methods.
- *
- */
-public class BeamInvalidGroupByTest extends BasePlanner {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @Test(expected = ValidationException.class)
-  public void testTumble2Explain() throws Exception {
-    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-  @Test(expected = ValidationException.class)
-  public void testTumble3Explain() throws Exception {
-    String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
-        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
-        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
deleted file mode 100644
index f98517b..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Tests to execute a query.
- *
- */
-public class BeamPlannerAggregationSubmitTest {
-  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @BeforeClass
-  public static void prepareClass() throws ParseException {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", getOrderTable());
-    BeamSqlEnv.registerTable("ORDER_SUMMARY", getSummaryTable());
-  }
-
-  @Before
-  public void prepare() throws ParseException {
-    MockedBeamSqlTable.CONTENT.clear();
-  }
-
-  private static BaseBeamTable getOrderTable() throws ParseException {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT)
-            .add("site_id", SqlTypeName.INTEGER)
-            .add("order_time", SqlTypeName.TIMESTAMP).build();
-      }
-    };
-
-    BeamSqlRecordType dataType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-    BeamSqlRow row1 = new BeamSqlRow(dataType);
-    row1.addField(0, 12345L);
-    row1.addField(1, 1);
-    row1.addField(2, format.parse("2017-01-01 01:02:03"));
-
-    BeamSqlRow row2 = new BeamSqlRow(dataType);
-    row2.addField(0, 12345L);
-    row2.addField(1, 0);
-    row2.addField(2, format.parse("2017-01-01 01:03:04"));
-
-    BeamSqlRow row3 = new BeamSqlRow(dataType);
-    row3.addField(0, 12345L);
-    row3.addField(1, 0);
-    row3.addField(2, format.parse("2017-01-01 02:03:04"));
-
-    BeamSqlRow row4 = new BeamSqlRow(dataType);
-    row4.addField(0, 2132L);
-    row4.addField(1, 0);
-    row4.addField(2, format.parse("2017-01-01 03:04:05"));
-
-    return new MockedBeamSqlTable(dataType).withInputRecords(
-        Arrays.asList(row1
-            , row2, row3, row4
-            ));
-
-  }
-
-  private static BaseBeamTable getSummaryTable() {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder()
-            .add("site_id", SqlTypeName.INTEGER)
-            .add("agg_hour", SqlTypeName.TIMESTAMP)
-            .add("size", SqlTypeName.BIGINT).build();
-      }
-    };
-    BeamSqlRecordType dataType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-
-    return new MockedBeamSqlTable(dataType);
-  }
-
-
-  @Test
-  public void selectWithWindowAggregation() throws Exception{
-    String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, agg_hour, SIZE) "
-        + "SELECT site_id, TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
-        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 1 " + "GROUP BY site_id"
-        + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
-
-    BeamSqlCli.compilePipeline(sql, pipeline);
-
-    pipeline.run().waitUntilFinish();
-
-    Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1);
-    BeamSqlRow result = MockedBeamSqlTable.CONTENT.peek();
-    Assert.assertEquals(1, result.getInteger(0));
-    Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1));
-    Assert.assertEquals(1L, result.getLong(2));
-  }
-
-  @Test
-  public void selectWithoutWindowAggregation() throws Exception{
-    String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, SIZE) "
-        + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-
-    BeamSqlCli.compilePipeline(sql, pipeline);
-
-    pipeline.run().waitUntilFinish();
-
-    Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1);
-    Assert.assertEquals("site_id=0,agg_hour=null,size=3",
-        MockedBeamSqlTable.CONTENT.peek().valueInString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
deleted file mode 100644
index e617ff2..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests to explain queries.
- *
- */
-public class BeamPlannerExplainTest extends BasePlanner {
-  @Test
-  public void selectAll() throws Exception {
-    String sql = "SELECT * FROM ORDER_DETAILS";
-    String plan = BeamSqlCli.explainQuery(sql);
-
-    String expectedPlan =
-        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
-        + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-
-  @Test
-  public void selectWithFilter() throws Exception {
-    String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 and price > 20";
-    String plan = BeamSqlCli.explainQuery(sql);
-
-    String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
-        + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
-        + "    BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-
-  @Test
-  public void insertSelectFilter() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
-        + " order_id, site_id, price " + "FROM ORDER_DETAILS "
-        + "WHERE SITE_ID = 0 and price > 20";
-    String plan = BeamSqlCli.explainQuery(sql);
-
-    String expectedPlan =
-        "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
-        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n"
-        + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
-        + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
-        + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
-    Assert.assertEquals("explain doesn't match", expectedPlan, plan);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
deleted file mode 100644
index 4df7f8a..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Tests to execute a query.
- *
- */
-public class BeamPlannerSubmitTest extends BasePlanner {
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @Before
-  public void prepare() {
-    MockedBeamSqlTable.CONTENT.clear();
-  }
-
-  @Test
-  public void insertSelectFilter() throws Exception {
-    String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
-        + " order_id, site_id, price "
-        + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
-
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-
-    pipeline.run().waitUntilFinish();
-
-    Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1);
-    Assert.assertTrue(MockedBeamSqlTable.CONTENT.peek().valueInString()
-        .contains("order_id=12345,site_id=0,price=20.5,order_time="));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/86dea078/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
deleted file mode 100644
index 8dc8439..0000000
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.BasePlanner;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Test case for BeamPCollectionTable.
- */
-public class BeamPCollectionTableTest extends BasePlanner{
-  @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
-
-  @Before
-  public void prepareTable(){
-    RelProtoDataType protoRowType = new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("c1", SqlTypeName.INTEGER)
-            .add("c2", SqlTypeName.VARCHAR).build();
-      }
-    };
-    BeamSqlRecordType dataType = CalciteUtils
-        .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-
-    BeamSqlRow row = new BeamSqlRow(CalciteUtils.toBeamRecordType(
-        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)));
-    row.addField(0, 1);
-    row.addField(1, "hello world.");
-    PCollection<BeamSqlRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
-    BeamSqlEnv.registerTable("COLLECTION_TABLE",
-        new BeamPCollectionTable(inputStream, dataType));
-  }
-
-  @Test
-  public void testSelectFromPCollectionTable() throws Exception{
-    String sql = "select c1, c2 from COLLECTION_TABLE";
-    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-}


[2/2] beam git commit: [BEAM-2452] This closes #3371

Posted by ta...@apache.org.
[BEAM-2452] This closes #3371


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/738eb4dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/738eb4dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/738eb4dd

Branch: refs/heads/DSL_SQL
Commit: 738eb4dd0a231883cfb64f17869e08dfdb00ac51
Parents: dcd769c 86dea07
Author: Tyler Akidau <ta...@apache.org>
Authored: Fri Jun 16 18:27:50 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Fri Jun 16 18:27:50 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/BeamSqlDslAggregationTest.java     | 260 +++++++++++++++++++
 .../apache/beam/dsls/sql/BeamSqlDslBase.java    | 125 +++++++++
 .../beam/dsls/sql/BeamSqlDslFilterTest.java     |  78 ++++++
 .../beam/dsls/sql/BeamSqlDslProjectTest.java    | 163 ++++++++++++
 .../beam/dsls/sql/planner/BasePlanner.java      | 108 --------
 .../sql/planner/BeamGroupByExplainTest.java     | 106 --------
 .../sql/planner/BeamGroupByPipelineTest.java    | 111 --------
 .../sql/planner/BeamInvalidGroupByTest.java     |  51 ----
 .../BeamPlannerAggregationSubmitTest.java       | 152 -----------
 .../sql/planner/BeamPlannerExplainTest.java     |  67 -----
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  56 ----
 .../sql/schema/BeamPCollectionTableTest.java    |  73 ------
 12 files changed, 626 insertions(+), 724 deletions(-)
----------------------------------------------------------------------