You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:28 UTC
[05/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
new file mode 100644
index 0000000..922931c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Surface test for BeamSql api.
+ */
+@RunWith(JUnit4.class)
+public class BeamSqlApiSurfaceTest {
+ @Test
+ public void testSdkApiSurface() throws Exception {
+
+ @SuppressWarnings("unchecked")
+ final Set<String> allowed =
+ ImmutableSet.of(
+ "org.apache.beam",
+ "org.joda.time",
+ "org.apache.commons.csv");
+
+ ApiSurface surface = ApiSurface
+ .ofClass(BeamSqlCli.class)
+ .includingClass(BeamSql.class)
+ .includingClass(BeamSqlEnv.class)
+ .includingPackage("org.apache.beam.dsls.sql.schema",
+ getClass().getClassLoader())
+ .pruningPrefix("java")
+ .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test")
+ .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase");
+
+ assertThat(surface, containsOnlyPackages(allowed));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
new file mode 100644
index 0000000..a142514
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window
+ * with BOUNDED PCollection.
+ */
+public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
+ /**
+ * GROUP-BY with single aggregation function with bounded PCollection.
+ */
+ @Test
+ public void testAggregationWithoutWindowWithBounded() throws Exception {
+ runAggregationWithoutWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with single aggregation function with unbounded PCollection.
+ */
+ @Test
+ public void testAggregationWithoutWindowWithUnbounded() throws Exception {
+ runAggregationWithoutWindow(unboundedInput1);
+ }
+
+ private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+
+ PCollection<BeamSqlRow> result =
+ input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("f_int2", 0);
+ record.addField("size", 4L);
+
+ PAssert.that(result).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * GROUP-BY with multiple aggregation functions with bounded PCollection.
+ */
+ @Test
+ public void testAggregationFunctionsWithBounded() throws Exception{
+ runAggregationFunctions(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with multiple aggregation functions with unbounded PCollection.
+ */
+ @Test
+ public void testAggregationFunctionsWithUnbounded() throws Exception{
+ runAggregationFunctions(unboundedInput1);
+ }
+
+ private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
+ String sql = "select f_int2, count(*) as size, "
+ + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+ + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
+ + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3,"
+ + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4,"
+ + "sum(f_double) as sum5, avg(f_double) as avg5, "
+ + "max(f_double) as max5, min(f_double) as min5,"
+ + "max(f_timestamp) as max6, min(f_timestamp) as min6 "
+ + "FROM TABLE_A group by f_int2";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testAggregationFunctions", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(
+ Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
+ "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
+ "max5", "min5", "max6", "min6"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
+ Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+ Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT,
+ Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+ Types.TIMESTAMP, Types.TIMESTAMP));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("f_int2", 0);
+ record.addField("size", 4L);
+
+ record.addField("sum1", 10000L);
+ record.addField("avg1", 2500L);
+ record.addField("max1", 4000L);
+ record.addField("min1", 1000L);
+
+ record.addField("sum2", (short) 10);
+ record.addField("avg2", (short) 2);
+ record.addField("max2", (short) 4);
+ record.addField("min2", (short) 1);
+
+ record.addField("sum3", (byte) 10);
+ record.addField("avg3", (byte) 2);
+ record.addField("max3", (byte) 4);
+ record.addField("min3", (byte) 1);
+
+ record.addField("sum4", 10.0F);
+ record.addField("avg4", 2.5F);
+ record.addField("max4", 4.0F);
+ record.addField("min4", 1.0F);
+
+ record.addField("sum5", 10.0);
+ record.addField("avg5", 2.5);
+ record.addField("max5", 4.0);
+ record.addField("min5", 1.0);
+
+ record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
+ record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
+
+ PAssert.that(result).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * Implicit GROUP-BY with DISTINCT with bounded PCollection.
+ */
+ @Test
+ public void testDistinctWithBounded() throws Exception {
+ runDistinct(boundedInput1);
+ }
+
+ /**
+ * Implicit GROUP-BY with DISTINCT with unbounded PCollection.
+ */
+ @Test
+ public void testDistinctWithUnbounded() throws Exception {
+ runDistinct(unboundedInput1);
+ }
+
+ private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
+
+ PCollection<BeamSqlRow> result =
+ input.apply("testDistinct", BeamSql.simpleQuery(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int", 1);
+ record1.addField("f_long", 1000L);
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int", 2);
+ record2.addField("f_long", 2000L);
+
+ BeamSqlRow record3 = new BeamSqlRow(resultType);
+ record3.addField("f_int", 3);
+ record3.addField("f_long", 3000L);
+
+ BeamSqlRow record4 = new BeamSqlRow(resultType);
+ record4.addField("f_int", 4);
+ record4.addField("f_long", 4000L);
+
+ PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection.
+ */
+ @Test
+ public void testTumbleWindowWithBounded() throws Exception {
+ runTumbleWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection.
+ */
+ @Test
+ public void testTumbleWindowWithUnbounded() throws Exception {
+ runTumbleWindow(unboundedInput1);
+ }
+
+ private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+ + " FROM TABLE_A"
+ + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testTumbleWindow", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(
+ Arrays.asList("f_int2", "size", "window_start"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int2", 0);
+ record1.addField("size", 3L);
+ record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
+ record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+ record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int2", 0);
+ record2.addField("size", 1L);
+ record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+ record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+ record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+ PAssert.that(result).containsInAnyOrder(record1, record2);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection.
+ */
+ @Test
+ public void testHopWindowWithBounded() throws Exception {
+ runHopWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection.
+ */
+ @Test
+ public void testHopWindowWithUnbounded() throws Exception {
+ runHopWindow(unboundedInput1);
+ }
+
+ private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+ + " FROM PCOLLECTION"
+ + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+ PCollection<BeamSqlRow> result =
+ input.apply("testHopWindow", BeamSql.simpleQuery(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(
+ Arrays.asList("f_int2", "size", "window_start"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int2", 0);
+ record1.addField("size", 3L);
+ record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
+ record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
+ record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int2", 0);
+ record2.addField("size", 3L);
+ record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
+ record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
+ record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+
+ BeamSqlRow record3 = new BeamSqlRow(resultType);
+ record3.addField("f_int2", 0);
+ record3.addField("size", 1L);
+ record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
+ record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
+ record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
+
+ BeamSqlRow record4 = new BeamSqlRow(resultType);
+ record4.addField("f_int2", 0);
+ record4.addField("size", 1L);
+ record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
+ record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
+ record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
+
+ PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * GROUP-BY with SESSION window with bounded PCollection.
+ */
+ @Test
+ public void testSessionWindowWithBounded() throws Exception {
+ runSessionWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with SESSION window with unbounded PCollection.
+ */
+ @Test
+ public void testSessionWindowWithUnbounded() throws Exception {
+ runSessionWindow(unboundedInput1);
+ }
+
+ private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+ + " FROM TABLE_A"
+ + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testSessionWindow", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(
+ Arrays.asList("f_int2", "size", "window_start"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int2", 0);
+ record1.addField("size", 3L);
+ record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
+ record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
+ record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int2", 0);
+ record2.addField("size", 1L);
+ record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
+ record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
+ record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
+
+ PAssert.that(result).containsInAnyOrder(record1, record2);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testWindowOnNonTimestampField() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage(
+ "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+ + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testUnsupportedDistinct() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage("Encountered \"*\"");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+
+ PCollection<BeamSqlRow> result =
+ boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
new file mode 100644
index 0000000..a5d92e7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+/**
+ * prepare input records to test {@link BeamSql}.
+ *
+ * <p>Note that, any change in these records would impact tests in this package.
+ *
+ */
+public class BeamSqlDslBase {
+ public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ @Rule
+ public ExpectedException exceptions = ExpectedException.none();
+
+ public static BeamSqlRowType rowTypeInTableA;
+ public static List<BeamSqlRow> recordsInTableA;
+
+ //bounded PCollections
+ public PCollection<BeamSqlRow> boundedInput1;
+ public PCollection<BeamSqlRow> boundedInput2;
+
+ //unbounded PCollections
+ public PCollection<BeamSqlRow> unboundedInput1;
+ public PCollection<BeamSqlRow> unboundedInput2;
+
+ @BeforeClass
+ public static void prepareClass() throws ParseException {
+ rowTypeInTableA = BeamSqlRowType.create(
+ Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
+ "f_timestamp", "f_int2", "f_decimal"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
+ Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL));
+
+ recordsInTableA = prepareInputRowsInTableA();
+ }
+
+ @Before
+ public void preparePCollections(){
+ boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
+ Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+
+ boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
+ Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+
+ unboundedInput1 = prepareUnboundedPCollection1();
+ unboundedInput2 = prepareUnboundedPCollection2();
+ }
+
+ private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
+ TestStream.Builder<BeamSqlRow> values = TestStream
+ .create(new BeamSqlRowCoder(rowTypeInTableA));
+
+ for (BeamSqlRow row : recordsInTableA) {
+ values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+ values = values.addElements(row);
+ }
+
+ return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
+ }
+
+ private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
+ TestStream.Builder<BeamSqlRow> values = TestStream
+ .create(new BeamSqlRowCoder(rowTypeInTableA));
+
+ BeamSqlRow row = recordsInTableA.get(0);
+ values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+ values = values.addElements(row);
+
+ return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
+ }
+
+ private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
+ List<BeamSqlRow> rows = new ArrayList<>();
+
+ BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
+ row1.addField(0, 1);
+ row1.addField(1, 1000L);
+ row1.addField(2, Short.valueOf("1"));
+ row1.addField(3, Byte.valueOf("1"));
+ row1.addField(4, 1.0f);
+ row1.addField(5, 1.0);
+ row1.addField(6, "string_row1");
+ row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
+ row1.addField(8, 0);
+ row1.addField(9, new BigDecimal(1));
+ rows.add(row1);
+
+ BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
+ row2.addField(0, 2);
+ row2.addField(1, 2000L);
+ row2.addField(2, Short.valueOf("2"));
+ row2.addField(3, Byte.valueOf("2"));
+ row2.addField(4, 2.0f);
+ row2.addField(5, 2.0);
+ row2.addField(6, "string_row2");
+ row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
+ row2.addField(8, 0);
+ row2.addField(9, new BigDecimal(2));
+ rows.add(row2);
+
+ BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
+ row3.addField(0, 3);
+ row3.addField(1, 3000L);
+ row3.addField(2, Short.valueOf("3"));
+ row3.addField(3, Byte.valueOf("3"));
+ row3.addField(4, 3.0f);
+ row3.addField(5, 3.0);
+ row3.addField(6, "string_row3");
+ row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
+ row3.addField(8, 0);
+ row3.addField(9, new BigDecimal(3));
+ rows.add(row3);
+
+ BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
+ row4.addField(0, 4);
+ row4.addField(1, 4000L);
+ row4.addField(2, Short.valueOf("4"));
+ row4.addField(3, Byte.valueOf("4"));
+ row4.addField(4, 4.0f);
+ row4.addField(5, 4.0);
+ row4.addField(6, "string_row4");
+ row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
+ row4.addField(8, 0);
+ row4.addField(9, new BigDecimal(4));
+ rows.add(row4);
+
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
new file mode 100644
index 0000000..b4b50c1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for WHERE queries with BOUNDED PCollection.
+ */
+public class BeamSqlDslFilterTest extends BeamSqlDslBase {
+ /**
+ * single filter with bounded PCollection.
+ */
+ @Test
+ public void testSingleFilterWithBounded() throws Exception {
+ runSingleFilter(boundedInput1);
+ }
+
+ /**
+ * single filter with unbounded PCollection.
+ */
+ @Test
+ public void testSingleFilterWithUnbounded() throws Exception {
+ runSingleFilter(unboundedInput1);
+ }
+
+ private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
+
+ PCollection<BeamSqlRow> result =
+ input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
+
+ PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * composite filters with bounded PCollection.
+ */
+ @Test
+ public void testCompositeFilterWithBounded() throws Exception {
+ runCompositeFilter(boundedInput1);
+ }
+
+ /**
+ * composite filters with unbounded PCollection.
+ */
+ @Test
+ public void testCompositeFilterWithUnbounded() throws Exception {
+ runCompositeFilter(unboundedInput1);
+ }
+
+ private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT * FROM TABLE_A"
+ + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testCompositeFilter", BeamSql.query(sql));
+
+ PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * nothing return with filters in bounded PCollection.
+ */
+ @Test
+ public void testNoReturnFilterWithBounded() throws Exception {
+ runNoReturnFilter(boundedInput1);
+ }
+
+ /**
+ * nothing return with filters in unbounded PCollection.
+ */
+ @Test
+ public void testNoReturnFilterWithUnbounded() throws Exception {
+ runNoReturnFilter(unboundedInput1);
+ }
+
+ private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testNoReturnFilter", BeamSql.query(sql));
+
+ PAssert.that(result).empty();
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testFromInvalidTableName1() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage("Object 'TABLE_B' not found");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ .apply("testFromInvalidTableName1", BeamSql.query(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testFromInvalidTableName2() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage("Use fixed table name PCOLLECTION");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT * FROM PCOLLECTION_NA";
+
+ PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testInvalidFilter() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage("Column 'f_int_na' not found in any table");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
+
+ PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
new file mode 100644
index 0000000..e010915
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for joins in queries.
+ */
+public class BeamSqlDslJoinTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+
+ private static final BeamSqlRowType SOURCE_RECORD_TYPE =
+ BeamSqlRowType.create(
+ Arrays.asList(
+ "order_id", "site_id", "price"
+ ),
+ Arrays.asList(
+ Types.INTEGER, Types.INTEGER, Types.INTEGER
+ )
+ );
+
+ private static final BeamSqlRowCoder SOURCE_CODER =
+ new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
+
+ private static final BeamSqlRowType RESULT_RECORD_TYPE =
+ BeamSqlRowType.create(
+ Arrays.asList(
+ "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
+ ),
+ Arrays.asList(
+ Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER
+ , Types.INTEGER, Types.INTEGER
+ )
+ );
+
+ private static final BeamSqlRowCoder RESULT_CODER =
+ new BeamSqlRowCoder(RESULT_RECORD_TYPE);
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1"
+ + " JOIN ORDER_DETAILS2 o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ RESULT_RECORD_TYPE
+ ).addRows(
+ 2, 3, 3, 1, 2, 3
+ ).getRows());
+ pipeline.run();
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1"
+ + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ RESULT_RECORD_TYPE
+ ).addRows(
+ 1, 2, 3, null, null, null,
+ 2, 3, 3, 1, 2, 3,
+ 3, 4, 5, null, null, null
+ ).getRows());
+ pipeline.run();
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1"
+ + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ RESULT_RECORD_TYPE
+ ).addRows(
+ 2, 3, 3, 1, 2, 3,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getRows());
+ pipeline.run();
+ }
+
+ @Test
+ public void testFullOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1"
+ + " FULL OUTER JOIN ORDER_DETAILS2 o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ RESULT_RECORD_TYPE
+ ).addRows(
+ 2, 3, 3, 1, 2, 3,
+ 1, 2, 3, null, null, null,
+ 3, 4, 5, null, null, null,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getRows());
+ pipeline.run();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testException_nonEqualJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1"
+ + " JOIN ORDER_DETAILS2 o2"
+ + " on "
+ + " o1.order_id>o2.site_id"
+ ;
+
+ pipeline.enableAbandonedNodeEnforcement(false);
+ queryFromOrderTables(sql);
+ pipeline.run();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testException_crossJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
+
+ pipeline.enableAbandonedNodeEnforcement(false);
+ queryFromOrderTables(sql);
+ pipeline.run();
+ }
+
+ private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
+ return PCollectionTuple
+ .of(
+ new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
+ ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+ )
+ .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
+ ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
+ ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
new file mode 100644
index 0000000..ab5a639
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for field-project in queries with BOUNDED PCollection.
+ */
+public class BeamSqlDslProjectTest extends BeamSqlDslBase {
+ /**
+ * select all fields with bounded PCollection.
+ */
+ @Test
+ public void testSelectAllWithBounded() throws Exception {
+ runSelectAll(boundedInput2);
+ }
+
+ /**
+ * select all fields with unbounded PCollection.
+ */
+ @Test
+ public void testSelectAllWithUnbounded() throws Exception {
+ runSelectAll(unboundedInput2);
+ }
+
+ private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT * FROM PCOLLECTION";
+
+ PCollection<BeamSqlRow> result =
+ input.apply("testSelectAll", BeamSql.simpleQuery(sql));
+
+ PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * select partial fields with bounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsWithBounded() throws Exception {
+ runPartialFields(boundedInput2);
+ }
+
+ /**
+ * select partial fields with unbounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsWithUnbounded() throws Exception {
+ runPartialFields(unboundedInput2);
+ }
+
+ private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testPartialFields", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+ record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+ PAssert.that(result).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * select partial fields for multiple rows with bounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInMultipleRowWithBounded() throws Exception {
+ runPartialFieldsInMultipleRow(boundedInput1);
+ }
+
+ /**
+ * select partial fields for multiple rows with unbounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception {
+ runPartialFieldsInMultipleRow(unboundedInput1);
+ }
+
+ private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+ record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+ record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+ BeamSqlRow record3 = new BeamSqlRow(resultType);
+ record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+ record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+ BeamSqlRow record4 = new BeamSqlRow(resultType);
+ record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+ record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+ PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * select partial fields with bounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInRowsWithBounded() throws Exception {
+ runPartialFieldsInRows(boundedInput1);
+ }
+
+ /**
+ * select partial fields with unbounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInRowsWithUnbounded() throws Exception {
+ runPartialFieldsInRows(unboundedInput1);
+ }
+
+ private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT f_int, f_long FROM TABLE_A";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testPartialFieldsInRows", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ Arrays.asList(Types.INTEGER, Types.BIGINT));
+
+ BeamSqlRow record1 = new BeamSqlRow(resultType);
+ record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
+ record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
+
+ BeamSqlRow record2 = new BeamSqlRow(resultType);
+ record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
+ record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
+
+ BeamSqlRow record3 = new BeamSqlRow(resultType);
+ record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
+ record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
+
+ BeamSqlRow record4 = new BeamSqlRow(resultType);
+ record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
+ record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
+
+ PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * select literal field with bounded PCollection.
+ */
+ @Test
+ public void testLiteralFieldWithBounded() throws Exception {
+ runLiteralField(boundedInput2);
+ }
+
+ /**
+ * select literal field with unbounded PCollection.
+ */
+ @Test
+ public void testLiteralFieldWithUnbounded() throws Exception {
+ runLiteralField(unboundedInput2);
+ }
+
+ public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
+ String sql = "SELECT 1 as literal_field FROM TABLE_A";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ .apply("testLiteralField", BeamSql.query(sql));
+
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
+ Arrays.asList(Types.INTEGER));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("literal_field", 1);
+
+ PAssert.that(result).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testProjectUnknownField() throws Exception {
+ exceptions.expect(IllegalStateException.class);
+ exceptions.expectMessage("Column 'f_int_na' not found in any table");
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ String sql = "SELECT f_int_na FROM TABLE_A";
+
+ PCollection<BeamSqlRow> result =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ .apply("testProjectUnknownField", BeamSql.query(sql));
+
+ pipeline.run().waitUntilFinish();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
new file mode 100644
index 0000000..726f658
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Iterator;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+
+/**
+ * Tests for UDF/UDAF.
+ */
+public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
+ /**
+ * GROUP-BY with UDAF.
+ */
+ @Test
+ public void testUdaf() throws Exception {
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
+ Arrays.asList(Types.INTEGER, Types.INTEGER));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("f_int2", 0);
+ record.addField("squaresum", 30);
+
+ String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
+ + " FROM PCOLLECTION GROUP BY f_int2";
+ PCollection<BeamSqlRow> result1 =
+ boundedInput1.apply("testUdaf1",
+ BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class));
+ PAssert.that(result1).containsInAnyOrder(record);
+
+ String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`"
+ + " FROM PCOLLECTION GROUP BY f_int2";
+ PCollection<BeamSqlRow> result2 =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+ .apply("testUdaf2",
+ BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class));
+ PAssert.that(result2).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * test UDF.
+ */
+ @Test
+ public void testUdf() throws Exception{
+ BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
+ Arrays.asList(Types.INTEGER, Types.INTEGER));
+
+ BeamSqlRow record = new BeamSqlRow(resultType);
+ record.addField("f_int", 2);
+ record.addField("cubicvalue", 8);
+
+ String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+ PCollection<BeamSqlRow> result1 =
+ boundedInput1.apply("testUdf1",
+ BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
+ PAssert.that(result1).containsInAnyOrder(record);
+
+ String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+ PCollection<BeamSqlRow> result2 =
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
+ .apply("testUdf2",
+ BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
+ PAssert.that(result2).containsInAnyOrder(record);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * UDAF for test, which returns the sum of square.
+ */
+ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
+
+ public SquareSum() {
+ }
+
+ @Override
+ public Integer init() {
+ return 0;
+ }
+
+ @Override
+ public Integer add(Integer accumulator, Integer input) {
+ return accumulator + input * input;
+ }
+
+ @Override
+ public Integer merge(Iterable<Integer> accumulators) {
+ int v = 0;
+ Iterator<Integer> ite = accumulators.iterator();
+ while (ite.hasNext()) {
+ v += ite.next();
+ }
+ return v;
+ }
+
+ @Override
+ public Integer result(Integer accumulator) {
+ return accumulator;
+ }
+
+ }
+
+ /**
+ * A example UDF for test.
+ */
+ public static class CubicInteger implements BeamSqlUdf{
+ public static Integer eval(Integer input){
+ return input * input * input;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
new file mode 100644
index 0000000..a669635
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Test utilities.
+ */
+public class TestUtils {
+ /**
+ * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
+ */
+ public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().valueInString());
+ }
+ }
+
+ /**
+ * Convert list of {@code BeamSqlRow} to list of {@code String}.
+ */
+ public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+ List<String> strs = new ArrayList<>();
+ for (BeamSqlRow row : rows) {
+ strs.add(row.valueInString());
+ }
+
+ return strs;
+ }
+
+ /**
+ * Convenient way to build a list of {@code BeamSqlRow}s.
+ *
+ * <p>You can use it like this:
+ *
+ * <pre>{@code
+ * TestUtils.RowsBuilder.of(
+ * Types.INTEGER, "order_id",
+ * Types.INTEGER, "sum_site_id",
+ * Types.VARCHAR, "buyer"
+ * ).addRows(
+ * 1, 3, "james",
+ * 2, 5, "bond"
+ * ).getStringRows()
+ * }</pre>
+ * {@code}
+ */
+ public static class RowsBuilder {
+ private BeamSqlRowType type;
+ private List<BeamSqlRow> rows = new ArrayList<>();
+
+ /**
+ * Create a RowsBuilder with the specified row type info.
+ *
+ * <p>For example:
+ * <pre>{@code
+ * TestUtils.RowsBuilder.of(
+ * Types.INTEGER, "order_id",
+ * Types.INTEGER, "sum_site_id",
+ * Types.VARCHAR, "buyer"
+ * )}</pre>
+ *
+ * @args pairs of column type and column names.
+ */
+ public static RowsBuilder of(final Object... args) {
+ BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
+ RowsBuilder builder = new RowsBuilder();
+ builder.type = beamSQLRowType;
+
+ return builder;
+ }
+
+ /**
+ * Create a RowsBuilder with the specified row type info.
+ *
+ * <p>For example:
+ * <pre>{@code
+ * TestUtils.RowsBuilder.of(
+ * beamSqlRowType
+ * )}</pre>
+ * @beamSQLRowType the record type.
+ */
+ public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
+ RowsBuilder builder = new RowsBuilder();
+ builder.type = beamSQLRowType;
+
+ return builder;
+ }
+
+ /**
+ * Add rows to the builder.
+ *
+ * <p>Note: check the class javadoc for for detailed example.
+ */
+ public RowsBuilder addRows(final Object... args) {
+ this.rows.addAll(buildRows(type, Arrays.asList(args)));
+ return this;
+ }
+
+ /**
+ * Add rows to the builder.
+ *
+ * <p>Note: check the class javadoc for for detailed example.
+ */
+ public RowsBuilder addRows(final List args) {
+ this.rows.addAll(buildRows(type, args));
+ return this;
+ }
+
+ public List<BeamSqlRow> getRows() {
+ return rows;
+ }
+
+ public List<String> getStringRows() {
+ return beamSqlRows2Strings(rows);
+ }
+ }
+
+ /**
+ * Convenient way to build a {@code BeamSqlRowType}.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * buildBeamSqlRowType(
+ * Types.BIGINT, "order_id",
+ * Types.INTEGER, "site_id",
+ * Types.DOUBLE, "price",
+ * Types.TIMESTAMP, "order_time"
+ * )
+ * }</pre>
+ */
+ public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
+ List<Integer> types = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+
+ for (int i = 0; i < args.length - 1; i += 2) {
+ types.add((int) args[i]);
+ names.add((String) args[i + 1]);
+ }
+
+ return BeamSqlRowType.create(names, types);
+ }
+
+ /**
+ * Convenient way to build a {@code BeamSqlRow}s.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * buildRows(
+ * rowType,
+ * 1, 1, 1, // the first row
+ * 2, 2, 2, // the second row
+ * ...
+ * )
+ * }</pre>
+ */
+ public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
+ List<BeamSqlRow> rows = new ArrayList<>();
+ int fieldCount = type.size();
+
+ for (int i = 0; i < args.size(); i += fieldCount) {
+ BeamSqlRow row = new BeamSqlRow(type);
+ for (int j = 0; j < fieldCount; j++) {
+ row.addField(j, args.get(i + j));
+ }
+ rows.add(row);
+ }
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
new file mode 100644
index 0000000..947660a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import org.junit.Test;
+
+/**
+ * Integration test for arithmetic operators.
+ */
+public class BeamSqlArithmeticOperatorsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+ private static final BigDecimal ZERO = BigDecimal.valueOf(0.0);
+ private static final BigDecimal ONE0 = BigDecimal.valueOf(1);
+ private static final BigDecimal ONE = BigDecimal.valueOf(1.0);
+ private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0));
+ private static final BigDecimal ONE10 = BigDecimal.ONE.divide(
+ BigDecimal.ONE, 10, RoundingMode.HALF_EVEN);
+ private static final BigDecimal TWO = BigDecimal.valueOf(2.0);
+
+ @Test
+ public void testPlus() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 + 1", 2)
+ .addExpr("1.0 + 1", TWO)
+ .addExpr("1 + 1.0", TWO)
+ .addExpr("1.0 + 1.0", TWO)
+ .addExpr("c_tinyint + c_tinyint", (byte) 2)
+ .addExpr("c_smallint + c_smallint", (short) 2)
+ .addExpr("c_bigint + c_bigint", 2L)
+ .addExpr("c_decimal + c_decimal", TWO)
+ .addExpr("c_tinyint + c_decimal", TWO)
+ .addExpr("c_float + c_decimal", 2.0)
+ .addExpr("c_double + c_decimal", 2.0)
+ .addExpr("c_float + c_float", 2.0f)
+ .addExpr("c_double + c_float", 2.0)
+ .addExpr("c_double + c_double", 2.0)
+ .addExpr("c_float + c_bigint", 2.0f)
+ .addExpr("c_double + c_bigint", 2.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testPlus_overflow() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2)
+ .addExpr("c_smallint_max + c_smallint_max", (short) -2)
+ .addExpr("c_integer_max + c_integer_max", -2)
+ // yeah, I know 384L is strange, but since it is already overflowed
+ // what the actualy result is not so important, it is wrong any way.
+ .addExpr("c_bigint_max + c_bigint_max", 384L)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMinus() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 - 1", 0)
+ .addExpr("1.0 - 1", ZERO)
+ .addExpr("1 - 0.0", ONE)
+ .addExpr("1.0 - 1.0", ZERO)
+ .addExpr("c_tinyint - c_tinyint", (byte) 0)
+ .addExpr("c_smallint - c_smallint", (short) 0)
+ .addExpr("c_bigint - c_bigint", 0L)
+ .addExpr("c_decimal - c_decimal", ZERO)
+ .addExpr("c_tinyint - c_decimal", ZERO)
+ .addExpr("c_float - c_decimal", 0.0)
+ .addExpr("c_double - c_decimal", 0.0)
+ .addExpr("c_float - c_float", 0.0f)
+ .addExpr("c_double - c_float", 0.0)
+ .addExpr("c_double - c_double", 0.0)
+ .addExpr("c_float - c_bigint", 0.0f)
+ .addExpr("c_double - c_bigint", 0.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMultiply() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 * 1", 1)
+ .addExpr("1.0 * 1", ONE2)
+ .addExpr("1 * 1.0", ONE2)
+ .addExpr("1.0 * 1.0", ONE2)
+ .addExpr("c_tinyint * c_tinyint", (byte) 1)
+ .addExpr("c_smallint * c_smallint", (short) 1)
+ .addExpr("c_bigint * c_bigint", 1L)
+ .addExpr("c_decimal * c_decimal", ONE2)
+ .addExpr("c_tinyint * c_decimal", ONE2)
+ .addExpr("c_float * c_decimal", 1.0)
+ .addExpr("c_double * c_decimal", 1.0)
+ .addExpr("c_float * c_float", 1.0f)
+ .addExpr("c_double * c_float", 1.0)
+ .addExpr("c_double * c_double", 1.0)
+ .addExpr("c_float * c_bigint", 1.0f)
+ .addExpr("c_double * c_bigint", 1.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testDivide() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 / 1", 1)
+ .addExpr("1.0 / 1", ONE10)
+ .addExpr("1 / 1.0", ONE10)
+ .addExpr("1.0 / 1.0", ONE10)
+ .addExpr("c_tinyint / c_tinyint", (byte) 1)
+ .addExpr("c_smallint / c_smallint", (short) 1)
+ .addExpr("c_bigint / c_bigint", 1L)
+ .addExpr("c_decimal / c_decimal", ONE10)
+ .addExpr("c_tinyint / c_decimal", ONE10)
+ .addExpr("c_float / c_decimal", 1.0)
+ .addExpr("c_double / c_decimal", 1.0)
+ .addExpr("c_float / c_float", 1.0f)
+ .addExpr("c_double / c_float", 1.0)
+ .addExpr("c_double / c_double", 1.0)
+ .addExpr("c_float / c_bigint", 1.0f)
+ .addExpr("c_double / c_bigint", 1.0)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testMod() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("mod(1, 1)", 0)
+ .addExpr("mod(1.0, 1)", 0)
+ .addExpr("mod(1, 1.0)", ZERO)
+ .addExpr("mod(1.0, 1.0)", ZERO)
+ .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0)
+ .addExpr("mod(c_smallint, c_smallint)", (short) 0)
+ .addExpr("mod(c_bigint, c_bigint)", 0L)
+ .addExpr("mod(c_decimal, c_decimal)", ZERO)
+ .addExpr("mod(c_tinyint, c_decimal)", ZERO)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
new file mode 100644
index 0000000..b9ce9b4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import com.google.common.base.Joiner;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.util.Pair;
+import org.junit.Rule;
+
+/**
+ * Base class for all built-in functions integration tests.
+ */
+public class BeamSqlBuiltinFunctionsIntegrationTestBase {
+ private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
+ static {
+ JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
+ JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
+ JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
+ JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
+ JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
+ JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
+ JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
+ }
+
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+
+ protected PCollection<BeamSqlRow> getTestPCollection() {
+ BeamSqlRowType type = BeamSqlRowType.create(
+ Arrays.asList("ts", "c_tinyint", "c_smallint",
+ "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
+ "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
+ Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
+ Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
+ Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
+ );
+ try {
+ return MockedBoundedTable
+ .of(type)
+ .addRows(
+ parseDate("1986-02-15 11:35:26"),
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0,
+ BigDecimal.ONE,
+ (byte) 127,
+ (short) 32767,
+ 2147483647,
+ 9223372036854775807L
+ )
+ .buildIOReader(pipeline)
+ .setCoder(new BeamSqlRowCoder(type));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Date parseDate(String str) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ return sdf.parse(str);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Helper class to make write integration test for built-in functions easier.
+ *
+ * <p>example usage:
+ * <pre>{@code
+ * ExpressionChecker checker = new ExpressionChecker()
+ * .addExpr("1 + 1", 2)
+ * .addExpr("1.0 + 1", 2.0)
+ * .addExpr("1 + 1.0", 2.0)
+ * .addExpr("1.0 + 1.0", 2.0)
+ * .addExpr("c_tinyint + c_tinyint", (byte) 2);
+ * checker.buildRunAndCheck(inputCollections);
+ * }</pre>
+ */
+ public class ExpressionChecker {
+ private transient List<Pair<String, Object>> exps = new ArrayList<>();
+
+ public ExpressionChecker addExpr(String expression, Object expectedValue) {
+ exps.add(Pair.of(expression, expectedValue));
+ return this;
+ }
+
+ private String getSql() {
+ List<String> expStrs = new ArrayList<>();
+ for (Pair<String, Object> pair : exps) {
+ expStrs.add(pair.getKey());
+ }
+ return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION";
+ }
+
+ /**
+ * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
+ */
+ public void buildRunAndCheck() {
+ PCollection<BeamSqlRow> inputCollection = getTestPCollection();
+ System.out.println("SQL:>\n" + getSql());
+ try {
+ List<String> names = new ArrayList<>();
+ List<Integer> types = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+
+ for (Pair<String, Object> pair : exps) {
+ names.add(pair.getKey());
+ types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
+ values.add(pair.getValue());
+ }
+
+ PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
+ PAssert.that(rows).containsInAnyOrder(
+ TestUtils.RowsBuilder
+ .of(BeamSqlRowType.create(names, types))
+ .addRows(values)
+ .getRows()
+ );
+ inputCollection.getPipeline().run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
new file mode 100644
index 0000000..5502ad4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for comparison operators.
+ */
+public class BeamSqlComparisonOperatorsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+ @Test
+ public void testEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_1 = c_tinyint_1", true)
+ .addExpr("c_tinyint_1 = c_tinyint_2", false)
+ .addExpr("c_smallint_1 = c_smallint_1", true)
+ .addExpr("c_smallint_1 = c_smallint_2", false)
+ .addExpr("c_integer_1 = c_integer_1", true)
+ .addExpr("c_integer_1 = c_integer_2", false)
+ .addExpr("c_bigint_1 = c_bigint_1", true)
+ .addExpr("c_bigint_1 = c_bigint_2", false)
+ .addExpr("c_float_1 = c_float_1", true)
+ .addExpr("c_float_1 = c_float_2", false)
+ .addExpr("c_double_1 = c_double_1", true)
+ .addExpr("c_double_1 = c_double_2", false)
+ .addExpr("c_decimal_1 = c_decimal_1", true)
+ .addExpr("c_decimal_1 = c_decimal_2", false)
+ .addExpr("c_varchar_1 = c_varchar_1", true)
+ .addExpr("c_varchar_1 = c_varchar_2", false)
+ .addExpr("c_boolean_true = c_boolean_true", true)
+ .addExpr("c_boolean_true = c_boolean_false", false)
+
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testNotEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_1 <> c_tinyint_1", false)
+ .addExpr("c_tinyint_1 <> c_tinyint_2", true)
+ .addExpr("c_smallint_1 <> c_smallint_1", false)
+ .addExpr("c_smallint_1 <> c_smallint_2", true)
+ .addExpr("c_integer_1 <> c_integer_1", false)
+ .addExpr("c_integer_1 <> c_integer_2", true)
+ .addExpr("c_bigint_1 <> c_bigint_1", false)
+ .addExpr("c_bigint_1 <> c_bigint_2", true)
+ .addExpr("c_float_1 <> c_float_1", false)
+ .addExpr("c_float_1 <> c_float_2", true)
+ .addExpr("c_double_1 <> c_double_1", false)
+ .addExpr("c_double_1 <> c_double_2", true)
+ .addExpr("c_decimal_1 <> c_decimal_1", false)
+ .addExpr("c_decimal_1 <> c_decimal_2", true)
+ .addExpr("c_varchar_1 <> c_varchar_1", false)
+ .addExpr("c_varchar_1 <> c_varchar_2", true)
+ .addExpr("c_boolean_true <> c_boolean_true", false)
+ .addExpr("c_boolean_true <> c_boolean_false", true)
+ ;
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testGreaterThan() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 > c_tinyint_1", true)
+ .addExpr("c_tinyint_1 > c_tinyint_1", false)
+ .addExpr("c_tinyint_1 > c_tinyint_2", false)
+
+ .addExpr("c_smallint_2 > c_smallint_1", true)
+ .addExpr("c_smallint_1 > c_smallint_1", false)
+ .addExpr("c_smallint_1 > c_smallint_2", false)
+
+ .addExpr("c_integer_2 > c_integer_1", true)
+ .addExpr("c_integer_1 > c_integer_1", false)
+ .addExpr("c_integer_1 > c_integer_2", false)
+
+ .addExpr("c_bigint_2 > c_bigint_1", true)
+ .addExpr("c_bigint_1 > c_bigint_1", false)
+ .addExpr("c_bigint_1 > c_bigint_2", false)
+
+ .addExpr("c_float_2 > c_float_1", true)
+ .addExpr("c_float_1 > c_float_1", false)
+ .addExpr("c_float_1 > c_float_2", false)
+
+ .addExpr("c_double_2 > c_double_1", true)
+ .addExpr("c_double_1 > c_double_1", false)
+ .addExpr("c_double_1 > c_double_2", false)
+
+ .addExpr("c_decimal_2 > c_decimal_1", true)
+ .addExpr("c_decimal_1 > c_decimal_1", false)
+ .addExpr("c_decimal_1 > c_decimal_2", false)
+
+ .addExpr("c_varchar_2 > c_varchar_1", true)
+ .addExpr("c_varchar_1 > c_varchar_1", false)
+ .addExpr("c_varchar_1 > c_varchar_2", false)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGreaterThanException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false > c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testGreaterThanOrEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 >= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 >= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 >= c_tinyint_2", false)
+
+ .addExpr("c_smallint_2 >= c_smallint_1", true)
+ .addExpr("c_smallint_1 >= c_smallint_1", true)
+ .addExpr("c_smallint_1 >= c_smallint_2", false)
+
+ .addExpr("c_integer_2 >= c_integer_1", true)
+ .addExpr("c_integer_1 >= c_integer_1", true)
+ .addExpr("c_integer_1 >= c_integer_2", false)
+
+ .addExpr("c_bigint_2 >= c_bigint_1", true)
+ .addExpr("c_bigint_1 >= c_bigint_1", true)
+ .addExpr("c_bigint_1 >= c_bigint_2", false)
+
+ .addExpr("c_float_2 >= c_float_1", true)
+ .addExpr("c_float_1 >= c_float_1", true)
+ .addExpr("c_float_1 >= c_float_2", false)
+
+ .addExpr("c_double_2 >= c_double_1", true)
+ .addExpr("c_double_1 >= c_double_1", true)
+ .addExpr("c_double_1 >= c_double_2", false)
+
+ .addExpr("c_decimal_2 >= c_decimal_1", true)
+ .addExpr("c_decimal_1 >= c_decimal_1", true)
+ .addExpr("c_decimal_1 >= c_decimal_2", false)
+
+ .addExpr("c_varchar_2 >= c_varchar_1", true)
+ .addExpr("c_varchar_1 >= c_varchar_1", true)
+ .addExpr("c_varchar_1 >= c_varchar_2", false)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testGreaterThanOrEqualsException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false >= c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLessThan() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 < c_tinyint_1", false)
+ .addExpr("c_tinyint_1 < c_tinyint_1", false)
+ .addExpr("c_tinyint_1 < c_tinyint_2", true)
+
+ .addExpr("c_smallint_2 < c_smallint_1", false)
+ .addExpr("c_smallint_1 < c_smallint_1", false)
+ .addExpr("c_smallint_1 < c_smallint_2", true)
+
+ .addExpr("c_integer_2 < c_integer_1", false)
+ .addExpr("c_integer_1 < c_integer_1", false)
+ .addExpr("c_integer_1 < c_integer_2", true)
+
+ .addExpr("c_bigint_2 < c_bigint_1", false)
+ .addExpr("c_bigint_1 < c_bigint_1", false)
+ .addExpr("c_bigint_1 < c_bigint_2", true)
+
+ .addExpr("c_float_2 < c_float_1", false)
+ .addExpr("c_float_1 < c_float_1", false)
+ .addExpr("c_float_1 < c_float_2", true)
+
+ .addExpr("c_double_2 < c_double_1", false)
+ .addExpr("c_double_1 < c_double_1", false)
+ .addExpr("c_double_1 < c_double_2", true)
+
+ .addExpr("c_decimal_2 < c_decimal_1", false)
+ .addExpr("c_decimal_1 < c_decimal_1", false)
+ .addExpr("c_decimal_1 < c_decimal_2", true)
+
+ .addExpr("c_varchar_2 < c_varchar_1", false)
+ .addExpr("c_varchar_1 < c_varchar_1", false)
+ .addExpr("c_varchar_1 < c_varchar_2", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testLessThanException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false < c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLessThanOrEquals() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_tinyint_2 <= c_tinyint_1", false)
+ .addExpr("c_tinyint_1 <= c_tinyint_1", true)
+ .addExpr("c_tinyint_1 <= c_tinyint_2", true)
+
+ .addExpr("c_smallint_2 <= c_smallint_1", false)
+ .addExpr("c_smallint_1 <= c_smallint_1", true)
+ .addExpr("c_smallint_1 <= c_smallint_2", true)
+
+ .addExpr("c_integer_2 <= c_integer_1", false)
+ .addExpr("c_integer_1 <= c_integer_1", true)
+ .addExpr("c_integer_1 <= c_integer_2", true)
+
+ .addExpr("c_bigint_2 <= c_bigint_1", false)
+ .addExpr("c_bigint_1 <= c_bigint_1", true)
+ .addExpr("c_bigint_1 <= c_bigint_2", true)
+
+ .addExpr("c_float_2 <= c_float_1", false)
+ .addExpr("c_float_1 <= c_float_1", true)
+ .addExpr("c_float_1 <= c_float_2", true)
+
+ .addExpr("c_double_2 <= c_double_1", false)
+ .addExpr("c_double_1 <= c_double_1", true)
+ .addExpr("c_double_1 <= c_double_2", true)
+
+ .addExpr("c_decimal_2 <= c_decimal_1", false)
+ .addExpr("c_decimal_1 <= c_decimal_1", true)
+ .addExpr("c_decimal_1 <= c_decimal_2", true)
+
+ .addExpr("c_varchar_2 <= c_varchar_1", false)
+ .addExpr("c_varchar_1 <= c_varchar_1", true)
+ .addExpr("c_varchar_1 <= c_varchar_2", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testLessThanOrEqualsException() {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("c_boolean_false <= c_boolean_true", false);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testIsNullAndIsNotNull() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr("1 IS NOT NULL", true)
+ .addExpr("NULL IS NOT NULL", false)
+
+ .addExpr("1 IS NULL", false)
+ .addExpr("NULL IS NULL", true)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+
+ @Override protected PCollection<BeamSqlRow> getTestPCollection() {
+ BeamSqlRowType type = BeamSqlRowType.create(
+ Arrays.asList(
+ "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
+ "c_smallint_0", "c_smallint_1", "c_smallint_2",
+ "c_integer_0", "c_integer_1", "c_integer_2",
+ "c_bigint_0", "c_bigint_1", "c_bigint_2",
+ "c_float_0", "c_float_1", "c_float_2",
+ "c_double_0", "c_double_1", "c_double_2",
+ "c_decimal_0", "c_decimal_1", "c_decimal_2",
+ "c_varchar_0", "c_varchar_1", "c_varchar_2",
+ "c_boolean_false", "c_boolean_true"
+ ),
+ Arrays.asList(
+ Types.TINYINT, Types.TINYINT, Types.TINYINT,
+ Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+ Types.INTEGER, Types.INTEGER, Types.INTEGER,
+ Types.BIGINT, Types.BIGINT, Types.BIGINT,
+ Types.FLOAT, Types.FLOAT, Types.FLOAT,
+ Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+ Types.DECIMAL, Types.DECIMAL, Types.DECIMAL,
+ Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
+ Types.BOOLEAN, Types.BOOLEAN
+ )
+ );
+ try {
+ return MockedBoundedTable
+ .of(type)
+ .addRows(
+ (byte) 0, (byte) 1, (byte) 2,
+ (short) 0, (short) 1, (short) 2,
+ 0, 1, 2,
+ 0L, 1L, 2L,
+ 0.0f, 1.0f, 2.0f,
+ 0.0, 1.0, 2.0,
+ BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE),
+ "a", "b", "c",
+ false, true
+ )
+ .buildIOReader(pipeline)
+ .setCoder(new BeamSqlRowCoder(type));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
new file mode 100644
index 0000000..6233aeb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.integrationtest;
+
+import org.junit.Test;
+
+/**
+ * Integration test for conditional functions.
+ */
+public class BeamSqlConditionalFunctionsIntegrationTest
+ extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+ @Test
+ public void testConditionalFunctions() throws Exception {
+ ExpressionChecker checker = new ExpressionChecker()
+ .addExpr(
+ "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END",
+ "hello"
+ )
+ .addExpr(
+ "CASE 2 "
+ + "WHEN 1 THEN 'hello' "
+ + "WHEN 3 THEN 'bond' "
+ + "ELSE 'world' END",
+ "world"
+ )
+ .addExpr(
+ "CASE "
+ + "WHEN 1 = 1 THEN 'hello' "
+ + "ELSE 'world' END",
+ "hello"
+ )
+ .addExpr(
+ "CASE "
+ + "WHEN 1 > 1 THEN 'hello' "
+ + "ELSE 'world' END",
+ "world"
+ )
+ .addExpr("NULLIF(5, 4) ", 5)
+ .addExpr("COALESCE(1, 5) ", 1)
+ .addExpr("COALESCE(NULL, 5) ", 5)
+ ;
+
+ checker.buildRunAndCheck();
+ }
+}