You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:53 UTC
[30/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
deleted file mode 100644
index a5d92e7..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
-/**
- * prepare input records to test {@link BeamSql}.
- *
- * <p>Note that, any change in these records would impact tests in this package.
- *
- */
-public class BeamSqlDslBase {
- public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
- @Rule
- public ExpectedException exceptions = ExpectedException.none();
-
- public static BeamSqlRowType rowTypeInTableA;
- public static List<BeamSqlRow> recordsInTableA;
-
- //bounded PCollections
- public PCollection<BeamSqlRow> boundedInput1;
- public PCollection<BeamSqlRow> boundedInput2;
-
- //unbounded PCollections
- public PCollection<BeamSqlRow> unboundedInput1;
- public PCollection<BeamSqlRow> unboundedInput2;
-
- @BeforeClass
- public static void prepareClass() throws ParseException {
- rowTypeInTableA = BeamSqlRowType.create(
- Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
- "f_timestamp", "f_int2", "f_decimal"),
- Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
- Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL));
-
- recordsInTableA = prepareInputRowsInTableA();
- }
-
- @Before
- public void preparePCollections(){
- boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
- Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
-
- boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
- Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
-
- unboundedInput1 = prepareUnboundedPCollection1();
- unboundedInput2 = prepareUnboundedPCollection2();
- }
-
- private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
- TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(rowTypeInTableA));
-
- for (BeamSqlRow row : recordsInTableA) {
- values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
- values = values.addElements(row);
- }
-
- return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
- }
-
- private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
- TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(rowTypeInTableA));
-
- BeamSqlRow row = recordsInTableA.get(0);
- values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
- values = values.addElements(row);
-
- return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
- }
-
- private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
- List<BeamSqlRow> rows = new ArrayList<>();
-
- BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
- row1.addField(0, 1);
- row1.addField(1, 1000L);
- row1.addField(2, Short.valueOf("1"));
- row1.addField(3, Byte.valueOf("1"));
- row1.addField(4, 1.0f);
- row1.addField(5, 1.0);
- row1.addField(6, "string_row1");
- row1.addField(7, FORMAT.parse("2017-01-01 01:01:03"));
- row1.addField(8, 0);
- row1.addField(9, new BigDecimal(1));
- rows.add(row1);
-
- BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
- row2.addField(0, 2);
- row2.addField(1, 2000L);
- row2.addField(2, Short.valueOf("2"));
- row2.addField(3, Byte.valueOf("2"));
- row2.addField(4, 2.0f);
- row2.addField(5, 2.0);
- row2.addField(6, "string_row2");
- row2.addField(7, FORMAT.parse("2017-01-01 01:02:03"));
- row2.addField(8, 0);
- row2.addField(9, new BigDecimal(2));
- rows.add(row2);
-
- BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
- row3.addField(0, 3);
- row3.addField(1, 3000L);
- row3.addField(2, Short.valueOf("3"));
- row3.addField(3, Byte.valueOf("3"));
- row3.addField(4, 3.0f);
- row3.addField(5, 3.0);
- row3.addField(6, "string_row3");
- row3.addField(7, FORMAT.parse("2017-01-01 01:06:03"));
- row3.addField(8, 0);
- row3.addField(9, new BigDecimal(3));
- rows.add(row3);
-
- BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
- row4.addField(0, 4);
- row4.addField(1, 4000L);
- row4.addField(2, Short.valueOf("4"));
- row4.addField(3, Byte.valueOf("4"));
- row4.addField(4, 4.0f);
- row4.addField(5, 4.0);
- row4.addField(6, "string_row4");
- row4.addField(7, FORMAT.parse("2017-01-01 02:04:03"));
- row4.addField(8, 0);
- row4.addField(9, new BigDecimal(4));
- rows.add(row4);
-
- return rows;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
deleted file mode 100644
index b4b50c1..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Test;
-
-/**
- * Tests for WHERE queries with BOUNDED PCollection.
- */
-public class BeamSqlDslFilterTest extends BeamSqlDslBase {
- /**
- * single filter with bounded PCollection.
- */
- @Test
- public void testSingleFilterWithBounded() throws Exception {
- runSingleFilter(boundedInput1);
- }
-
- /**
- * single filter with unbounded PCollection.
- */
- @Test
- public void testSingleFilterWithUnbounded() throws Exception {
- runSingleFilter(unboundedInput1);
- }
-
- private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
-
- PCollection<BeamSqlRow> result =
- input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
-
- PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * composite filters with bounded PCollection.
- */
- @Test
- public void testCompositeFilterWithBounded() throws Exception {
- runCompositeFilter(boundedInput1);
- }
-
- /**
- * composite filters with unbounded PCollection.
- */
- @Test
- public void testCompositeFilterWithUnbounded() throws Exception {
- runCompositeFilter(unboundedInput1);
- }
-
- private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT * FROM TABLE_A"
- + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testCompositeFilter", BeamSql.query(sql));
-
- PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * nothing return with filters in bounded PCollection.
- */
- @Test
- public void testNoReturnFilterWithBounded() throws Exception {
- runNoReturnFilter(boundedInput1);
- }
-
- /**
- * nothing return with filters in unbounded PCollection.
- */
- @Test
- public void testNoReturnFilterWithUnbounded() throws Exception {
- runNoReturnFilter(unboundedInput1);
- }
-
- private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testNoReturnFilter", BeamSql.query(sql));
-
- PAssert.that(result).empty();
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testFromInvalidTableName1() throws Exception {
- exceptions.expect(IllegalStateException.class);
- exceptions.expectMessage("Object 'TABLE_B' not found");
- pipeline.enableAbandonedNodeEnforcement(false);
-
- String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
- .apply("testFromInvalidTableName1", BeamSql.query(sql));
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testFromInvalidTableName2() throws Exception {
- exceptions.expect(IllegalStateException.class);
- exceptions.expectMessage("Use fixed table name PCOLLECTION");
- pipeline.enableAbandonedNodeEnforcement(false);
-
- String sql = "SELECT * FROM PCOLLECTION_NA";
-
- PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testInvalidFilter() throws Exception {
- exceptions.expect(IllegalStateException.class);
- exceptions.expectMessage("Column 'f_int_na' not found in any table");
- pipeline.enableAbandonedNodeEnforcement(false);
-
- String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
-
- PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
-
- pipeline.run().waitUntilFinish();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
deleted file mode 100644
index e010915..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql;
-
-import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
-import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
-
-import java.sql.Types;
-import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Tests for joins in queries.
- */
-public class BeamSqlDslJoinTest {
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- private static final BeamSqlRowType SOURCE_RECORD_TYPE =
- BeamSqlRowType.create(
- Arrays.asList(
- "order_id", "site_id", "price"
- ),
- Arrays.asList(
- Types.INTEGER, Types.INTEGER, Types.INTEGER
- )
- );
-
- private static final BeamSqlRowCoder SOURCE_CODER =
- new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
-
- private static final BeamSqlRowType RESULT_RECORD_TYPE =
- BeamSqlRowType.create(
- Arrays.asList(
- "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
- ),
- Arrays.asList(
- Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER
- , Types.INTEGER, Types.INTEGER
- )
- );
-
- private static final BeamSqlRowCoder RESULT_CODER =
- new BeamSqlRowCoder(RESULT_RECORD_TYPE);
-
- @Test
- public void testInnerJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- RESULT_RECORD_TYPE
- ).addRows(
- 2, 3, 3, 1, 2, 3
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testLeftOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " LEFT OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- RESULT_RECORD_TYPE
- ).addRows(
- 1, 2, 3, null, null, null,
- 2, 3, 3, 1, 2, 3,
- 3, 4, 5, null, null, null
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testRightOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " RIGHT OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- RESULT_RECORD_TYPE
- ).addRows(
- 2, 3, 3, 1, 2, 3,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getRows());
- pipeline.run();
- }
-
- @Test
- public void testFullOuterJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " FULL OUTER JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
- ;
-
- PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder(
- TestUtils.RowsBuilder.of(
- RESULT_RECORD_TYPE
- ).addRows(
- 2, 3, 3, 1, 2, 3,
- 1, 2, 3, null, null, null,
- 3, 4, 5, null, null, null,
- null, null, null, 2, 3, 3,
- null, null, null, 3, 4, 5
- ).getRows());
- pipeline.run();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testException_nonEqualJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1"
- + " JOIN ORDER_DETAILS2 o2"
- + " on "
- + " o1.order_id>o2.site_id"
- ;
-
- pipeline.enableAbandonedNodeEnforcement(false);
- queryFromOrderTables(sql);
- pipeline.run();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testException_crossJoin() throws Exception {
- String sql =
- "SELECT * "
- + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
-
- pipeline.enableAbandonedNodeEnforcement(false);
- queryFromOrderTables(sql);
- pipeline.run();
- }
-
- private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
- return PCollectionTuple
- .of(
- new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
- ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
- )
- .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
- ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
- ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
deleted file mode 100644
index ab5a639..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.sql.Types;
-import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Test;
-
-/**
- * Tests for field-project in queries with BOUNDED PCollection.
- */
-public class BeamSqlDslProjectTest extends BeamSqlDslBase {
- /**
- * select all fields with bounded PCollection.
- */
- @Test
- public void testSelectAllWithBounded() throws Exception {
- runSelectAll(boundedInput2);
- }
-
- /**
- * select all fields with unbounded PCollection.
- */
- @Test
- public void testSelectAllWithUnbounded() throws Exception {
- runSelectAll(unboundedInput2);
- }
-
- private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT * FROM PCOLLECTION";
-
- PCollection<BeamSqlRow> result =
- input.apply("testSelectAll", BeamSql.simpleQuery(sql));
-
- PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * select partial fields with bounded PCollection.
- */
- @Test
- public void testPartialFieldsWithBounded() throws Exception {
- runPartialFields(boundedInput2);
- }
-
- /**
- * select partial fields with unbounded PCollection.
- */
- @Test
- public void testPartialFieldsWithUnbounded() throws Exception {
- runPartialFields(unboundedInput2);
- }
-
- private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT f_int, f_long FROM TABLE_A";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testPartialFields", BeamSql.query(sql));
-
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
- Arrays.asList(Types.INTEGER, Types.BIGINT));
-
- BeamSqlRow record = new BeamSqlRow(resultType);
- record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
-
- PAssert.that(result).containsInAnyOrder(record);
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * select partial fields for multiple rows with bounded PCollection.
- */
- @Test
- public void testPartialFieldsInMultipleRowWithBounded() throws Exception {
- runPartialFieldsInMultipleRow(boundedInput1);
- }
-
- /**
- * select partial fields for multiple rows with unbounded PCollection.
- */
- @Test
- public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception {
- runPartialFieldsInMultipleRow(unboundedInput1);
- }
-
- private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT f_int, f_long FROM TABLE_A";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
-
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
- Arrays.asList(Types.INTEGER, Types.BIGINT));
-
- BeamSqlRow record1 = new BeamSqlRow(resultType);
- record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
-
- BeamSqlRow record2 = new BeamSqlRow(resultType);
- record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
- record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
-
- BeamSqlRow record3 = new BeamSqlRow(resultType);
- record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
- record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
-
- BeamSqlRow record4 = new BeamSqlRow(resultType);
- record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
- record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
-
- PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * select partial fields with bounded PCollection.
- */
- @Test
- public void testPartialFieldsInRowsWithBounded() throws Exception {
- runPartialFieldsInRows(boundedInput1);
- }
-
- /**
- * select partial fields with unbounded PCollection.
- */
- @Test
- public void testPartialFieldsInRowsWithUnbounded() throws Exception {
- runPartialFieldsInRows(unboundedInput1);
- }
-
- private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT f_int, f_long FROM TABLE_A";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testPartialFieldsInRows", BeamSql.query(sql));
-
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
- Arrays.asList(Types.INTEGER, Types.BIGINT));
-
- BeamSqlRow record1 = new BeamSqlRow(resultType);
- record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
- record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
-
- BeamSqlRow record2 = new BeamSqlRow(resultType);
- record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
- record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
-
- BeamSqlRow record3 = new BeamSqlRow(resultType);
- record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
- record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
-
- BeamSqlRow record4 = new BeamSqlRow(resultType);
- record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
- record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
-
- PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * select literal field with bounded PCollection.
- */
- @Test
- public void testLiteralFieldWithBounded() throws Exception {
- runLiteralField(boundedInput2);
- }
-
- /**
- * select literal field with unbounded PCollection.
- */
- @Test
- public void testLiteralFieldWithUnbounded() throws Exception {
- runLiteralField(unboundedInput2);
- }
-
- public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
- String sql = "SELECT 1 as literal_field FROM TABLE_A";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
- .apply("testLiteralField", BeamSql.query(sql));
-
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
- Arrays.asList(Types.INTEGER));
-
- BeamSqlRow record = new BeamSqlRow(resultType);
- record.addField("literal_field", 1);
-
- PAssert.that(result).containsInAnyOrder(record);
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testProjectUnknownField() throws Exception {
- exceptions.expect(IllegalStateException.class);
- exceptions.expectMessage("Column 'f_int_na' not found in any table");
- pipeline.enableAbandonedNodeEnforcement(false);
-
- String sql = "SELECT f_int_na FROM TABLE_A";
-
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
- .apply("testProjectUnknownField", BeamSql.query(sql));
-
- pipeline.run().waitUntilFinish();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
deleted file mode 100644
index 726f658..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.sql.Types;
-import java.util.Arrays;
-import java.util.Iterator;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
-import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Test;
-
-/**
- * Tests for UDF/UDAF.
- */
-public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
- /**
- * GROUP-BY with UDAF.
- */
- @Test
- public void testUdaf() throws Exception {
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"),
- Arrays.asList(Types.INTEGER, Types.INTEGER));
-
- BeamSqlRow record = new BeamSqlRow(resultType);
- record.addField("f_int2", 0);
- record.addField("squaresum", 30);
-
- String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`"
- + " FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result1 =
- boundedInput1.apply("testUdaf1",
- BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class));
- PAssert.that(result1).containsInAnyOrder(record);
-
- String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`"
- + " FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result2 =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
- .apply("testUdaf2",
- BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class));
- PAssert.that(result2).containsInAnyOrder(record);
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * test UDF.
- */
- @Test
- public void testUdf() throws Exception{
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"),
- Arrays.asList(Types.INTEGER, Types.INTEGER));
-
- BeamSqlRow record = new BeamSqlRow(resultType);
- record.addField("f_int", 2);
- record.addField("cubicvalue", 8);
-
- String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
- PCollection<BeamSqlRow> result1 =
- boundedInput1.apply("testUdf1",
- BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class));
- PAssert.that(result1).containsInAnyOrder(record);
-
- String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
- PCollection<BeamSqlRow> result2 =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1)
- .apply("testUdf2",
- BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
- PAssert.that(result2).containsInAnyOrder(record);
-
- pipeline.run().waitUntilFinish();
- }
-
- /**
- * UDAF for test, which returns the sum of square.
- */
- public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
-
- public SquareSum() {
- }
-
- @Override
- public Integer init() {
- return 0;
- }
-
- @Override
- public Integer add(Integer accumulator, Integer input) {
- return accumulator + input * input;
- }
-
- @Override
- public Integer merge(Iterable<Integer> accumulators) {
- int v = 0;
- Iterator<Integer> ite = accumulators.iterator();
- while (ite.hasNext()) {
- v += ite.next();
- }
- return v;
- }
-
- @Override
- public Integer result(Integer accumulator) {
- return accumulator;
- }
-
- }
-
- /**
- * A example UDF for test.
- */
- public static class CubicInteger implements BeamSqlUdf{
- public static Integer eval(Integer input){
- return input * input * input;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
deleted file mode 100644
index a669635..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * Test utilities.
- */
-public class TestUtils {
- /**
- * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
- */
- public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().valueInString());
- }
- }
-
- /**
- * Convert list of {@code BeamSqlRow} to list of {@code String}.
- */
- public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
- List<String> strs = new ArrayList<>();
- for (BeamSqlRow row : rows) {
- strs.add(row.valueInString());
- }
-
- return strs;
- }
-
- /**
- * Convenient way to build a list of {@code BeamSqlRow}s.
- *
- * <p>You can use it like this:
- *
- * <pre>{@code
- * TestUtils.RowsBuilder.of(
- * Types.INTEGER, "order_id",
- * Types.INTEGER, "sum_site_id",
- * Types.VARCHAR, "buyer"
- * ).addRows(
- * 1, 3, "james",
- * 2, 5, "bond"
- * ).getStringRows()
- * }</pre>
- * {@code}
- */
- public static class RowsBuilder {
- private BeamSqlRowType type;
- private List<BeamSqlRow> rows = new ArrayList<>();
-
- /**
- * Create a RowsBuilder with the specified row type info.
- *
- * <p>For example:
- * <pre>{@code
- * TestUtils.RowsBuilder.of(
- * Types.INTEGER, "order_id",
- * Types.INTEGER, "sum_site_id",
- * Types.VARCHAR, "buyer"
- * )}</pre>
- *
- * @args pairs of column type and column names.
- */
- public static RowsBuilder of(final Object... args) {
- BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
- RowsBuilder builder = new RowsBuilder();
- builder.type = beamSQLRowType;
-
- return builder;
- }
-
- /**
- * Create a RowsBuilder with the specified row type info.
- *
- * <p>For example:
- * <pre>{@code
- * TestUtils.RowsBuilder.of(
- * beamSqlRowType
- * )}</pre>
- * @beamSQLRowType the record type.
- */
- public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
- RowsBuilder builder = new RowsBuilder();
- builder.type = beamSQLRowType;
-
- return builder;
- }
-
- /**
- * Add rows to the builder.
- *
- * <p>Note: check the class javadoc for for detailed example.
- */
- public RowsBuilder addRows(final Object... args) {
- this.rows.addAll(buildRows(type, Arrays.asList(args)));
- return this;
- }
-
- /**
- * Add rows to the builder.
- *
- * <p>Note: check the class javadoc for for detailed example.
- */
- public RowsBuilder addRows(final List args) {
- this.rows.addAll(buildRows(type, args));
- return this;
- }
-
- public List<BeamSqlRow> getRows() {
- return rows;
- }
-
- public List<String> getStringRows() {
- return beamSqlRows2Strings(rows);
- }
- }
-
- /**
- * Convenient way to build a {@code BeamSqlRowType}.
- *
- * <p>e.g.
- *
- * <pre>{@code
- * buildBeamSqlRowType(
- * Types.BIGINT, "order_id",
- * Types.INTEGER, "site_id",
- * Types.DOUBLE, "price",
- * Types.TIMESTAMP, "order_time"
- * )
- * }</pre>
- */
- public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
- List<Integer> types = new ArrayList<>();
- List<String> names = new ArrayList<>();
-
- for (int i = 0; i < args.length - 1; i += 2) {
- types.add((int) args[i]);
- names.add((String) args[i + 1]);
- }
-
- return BeamSqlRowType.create(names, types);
- }
-
- /**
- * Convenient way to build a {@code BeamSqlRow}s.
- *
- * <p>e.g.
- *
- * <pre>{@code
- * buildRows(
- * rowType,
- * 1, 1, 1, // the first row
- * 2, 2, 2, // the second row
- * ...
- * )
- * }</pre>
- */
- public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
- List<BeamSqlRow> rows = new ArrayList<>();
- int fieldCount = type.size();
-
- for (int i = 0; i < args.size(); i += fieldCount) {
- BeamSqlRow row = new BeamSqlRow(type);
- for (int j = 0; j < fieldCount; j++) {
- row.addField(j, args.get(i + j));
- }
- rows.add(row);
- }
- return rows;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
deleted file mode 100644
index 947660a..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.integrationtest;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import org.junit.Test;
-
-/**
- * Integration test for arithmetic operators.
- */
-public class BeamSqlArithmeticOperatorsIntegrationTest
- extends BeamSqlBuiltinFunctionsIntegrationTestBase {
-
- private static final BigDecimal ZERO = BigDecimal.valueOf(0.0);
- private static final BigDecimal ONE0 = BigDecimal.valueOf(1);
- private static final BigDecimal ONE = BigDecimal.valueOf(1.0);
- private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0));
- private static final BigDecimal ONE10 = BigDecimal.ONE.divide(
- BigDecimal.ONE, 10, RoundingMode.HALF_EVEN);
- private static final BigDecimal TWO = BigDecimal.valueOf(2.0);
-
- @Test
- public void testPlus() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("1 + 1", 2)
- .addExpr("1.0 + 1", TWO)
- .addExpr("1 + 1.0", TWO)
- .addExpr("1.0 + 1.0", TWO)
- .addExpr("c_tinyint + c_tinyint", (byte) 2)
- .addExpr("c_smallint + c_smallint", (short) 2)
- .addExpr("c_bigint + c_bigint", 2L)
- .addExpr("c_decimal + c_decimal", TWO)
- .addExpr("c_tinyint + c_decimal", TWO)
- .addExpr("c_float + c_decimal", 2.0)
- .addExpr("c_double + c_decimal", 2.0)
- .addExpr("c_float + c_float", 2.0f)
- .addExpr("c_double + c_float", 2.0)
- .addExpr("c_double + c_double", 2.0)
- .addExpr("c_float + c_bigint", 2.0f)
- .addExpr("c_double + c_bigint", 2.0)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testPlus_overflow() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2)
- .addExpr("c_smallint_max + c_smallint_max", (short) -2)
- .addExpr("c_integer_max + c_integer_max", -2)
- // yeah, I know 384L is strange, but since it is already overflowed
- // what the actualy result is not so important, it is wrong any way.
- .addExpr("c_bigint_max + c_bigint_max", 384L)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testMinus() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("1 - 1", 0)
- .addExpr("1.0 - 1", ZERO)
- .addExpr("1 - 0.0", ONE)
- .addExpr("1.0 - 1.0", ZERO)
- .addExpr("c_tinyint - c_tinyint", (byte) 0)
- .addExpr("c_smallint - c_smallint", (short) 0)
- .addExpr("c_bigint - c_bigint", 0L)
- .addExpr("c_decimal - c_decimal", ZERO)
- .addExpr("c_tinyint - c_decimal", ZERO)
- .addExpr("c_float - c_decimal", 0.0)
- .addExpr("c_double - c_decimal", 0.0)
- .addExpr("c_float - c_float", 0.0f)
- .addExpr("c_double - c_float", 0.0)
- .addExpr("c_double - c_double", 0.0)
- .addExpr("c_float - c_bigint", 0.0f)
- .addExpr("c_double - c_bigint", 0.0)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testMultiply() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("1 * 1", 1)
- .addExpr("1.0 * 1", ONE2)
- .addExpr("1 * 1.0", ONE2)
- .addExpr("1.0 * 1.0", ONE2)
- .addExpr("c_tinyint * c_tinyint", (byte) 1)
- .addExpr("c_smallint * c_smallint", (short) 1)
- .addExpr("c_bigint * c_bigint", 1L)
- .addExpr("c_decimal * c_decimal", ONE2)
- .addExpr("c_tinyint * c_decimal", ONE2)
- .addExpr("c_float * c_decimal", 1.0)
- .addExpr("c_double * c_decimal", 1.0)
- .addExpr("c_float * c_float", 1.0f)
- .addExpr("c_double * c_float", 1.0)
- .addExpr("c_double * c_double", 1.0)
- .addExpr("c_float * c_bigint", 1.0f)
- .addExpr("c_double * c_bigint", 1.0)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testDivide() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("1 / 1", 1)
- .addExpr("1.0 / 1", ONE10)
- .addExpr("1 / 1.0", ONE10)
- .addExpr("1.0 / 1.0", ONE10)
- .addExpr("c_tinyint / c_tinyint", (byte) 1)
- .addExpr("c_smallint / c_smallint", (short) 1)
- .addExpr("c_bigint / c_bigint", 1L)
- .addExpr("c_decimal / c_decimal", ONE10)
- .addExpr("c_tinyint / c_decimal", ONE10)
- .addExpr("c_float / c_decimal", 1.0)
- .addExpr("c_double / c_decimal", 1.0)
- .addExpr("c_float / c_float", 1.0f)
- .addExpr("c_double / c_float", 1.0)
- .addExpr("c_double / c_double", 1.0)
- .addExpr("c_float / c_bigint", 1.0f)
- .addExpr("c_double / c_bigint", 1.0)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testMod() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("mod(1, 1)", 0)
- .addExpr("mod(1.0, 1)", 0)
- .addExpr("mod(1, 1.0)", ZERO)
- .addExpr("mod(1.0, 1.0)", ZERO)
- .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0)
- .addExpr("mod(c_smallint, c_smallint)", (short) 0)
- .addExpr("mod(c_bigint, c_bigint)", 0L)
- .addExpr("mod(c_decimal, c_decimal)", ZERO)
- .addExpr("mod(c_tinyint, c_decimal)", ZERO)
- ;
-
- checker.buildRunAndCheck();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
deleted file mode 100644
index b9ce9b4..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.integrationtest;
-
-import com.google.common.base.Joiner;
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.TestUtils;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.util.Pair;
-import org.junit.Rule;
-
-/**
- * Base class for all built-in functions integration tests.
- */
-public class BeamSqlBuiltinFunctionsIntegrationTestBase {
- private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
- static {
- JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
- JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
- JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
- JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
- JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
- JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
- JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
- }
-
- @Rule
- public final TestPipeline pipeline = TestPipeline.create();
-
- protected PCollection<BeamSqlRow> getTestPCollection() {
- BeamSqlRowType type = BeamSqlRowType.create(
- Arrays.asList("ts", "c_tinyint", "c_smallint",
- "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
- "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
- Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
- Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
- Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
- );
- try {
- return MockedBoundedTable
- .of(type)
- .addRows(
- parseDate("1986-02-15 11:35:26"),
- (byte) 1,
- (short) 1,
- 1,
- 1L,
- 1.0f,
- 1.0,
- BigDecimal.ONE,
- (byte) 127,
- (short) 32767,
- 2147483647,
- 9223372036854775807L
- )
- .buildIOReader(pipeline)
- .setCoder(new BeamSqlRowCoder(type));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected static Date parseDate(String str) {
- try {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
- return sdf.parse(str);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-
- /**
- * Helper class to make write integration test for built-in functions easier.
- *
- * <p>example usage:
- * <pre>{@code
- * ExpressionChecker checker = new ExpressionChecker()
- * .addExpr("1 + 1", 2)
- * .addExpr("1.0 + 1", 2.0)
- * .addExpr("1 + 1.0", 2.0)
- * .addExpr("1.0 + 1.0", 2.0)
- * .addExpr("c_tinyint + c_tinyint", (byte) 2);
- * checker.buildRunAndCheck(inputCollections);
- * }</pre>
- */
- public class ExpressionChecker {
- private transient List<Pair<String, Object>> exps = new ArrayList<>();
-
- public ExpressionChecker addExpr(String expression, Object expectedValue) {
- exps.add(Pair.of(expression, expectedValue));
- return this;
- }
-
- private String getSql() {
- List<String> expStrs = new ArrayList<>();
- for (Pair<String, Object> pair : exps) {
- expStrs.add(pair.getKey());
- }
- return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION";
- }
-
- /**
- * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result.
- */
- public void buildRunAndCheck() {
- PCollection<BeamSqlRow> inputCollection = getTestPCollection();
- System.out.println("SQL:>\n" + getSql());
- try {
- List<String> names = new ArrayList<>();
- List<Integer> types = new ArrayList<>();
- List<Object> values = new ArrayList<>();
-
- for (Pair<String, Object> pair : exps) {
- names.add(pair.getKey());
- types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass()));
- values.add(pair.getValue());
- }
-
- PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql()));
- PAssert.that(rows).containsInAnyOrder(
- TestUtils.RowsBuilder
- .of(BeamSqlRowType.create(names, types))
- .addRows(values)
- .getRows()
- );
- inputCollection.getPipeline().run();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
deleted file mode 100644
index 5502ad4..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.integrationtest;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.Arrays;
-import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Integration test for comparison operators.
- */
-public class BeamSqlComparisonOperatorsIntegrationTest
- extends BeamSqlBuiltinFunctionsIntegrationTestBase {
-
- @Test
- public void testEquals() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_1 = c_tinyint_1", true)
- .addExpr("c_tinyint_1 = c_tinyint_2", false)
- .addExpr("c_smallint_1 = c_smallint_1", true)
- .addExpr("c_smallint_1 = c_smallint_2", false)
- .addExpr("c_integer_1 = c_integer_1", true)
- .addExpr("c_integer_1 = c_integer_2", false)
- .addExpr("c_bigint_1 = c_bigint_1", true)
- .addExpr("c_bigint_1 = c_bigint_2", false)
- .addExpr("c_float_1 = c_float_1", true)
- .addExpr("c_float_1 = c_float_2", false)
- .addExpr("c_double_1 = c_double_1", true)
- .addExpr("c_double_1 = c_double_2", false)
- .addExpr("c_decimal_1 = c_decimal_1", true)
- .addExpr("c_decimal_1 = c_decimal_2", false)
- .addExpr("c_varchar_1 = c_varchar_1", true)
- .addExpr("c_varchar_1 = c_varchar_2", false)
- .addExpr("c_boolean_true = c_boolean_true", true)
- .addExpr("c_boolean_true = c_boolean_false", false)
-
- ;
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testNotEquals() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_1 <> c_tinyint_1", false)
- .addExpr("c_tinyint_1 <> c_tinyint_2", true)
- .addExpr("c_smallint_1 <> c_smallint_1", false)
- .addExpr("c_smallint_1 <> c_smallint_2", true)
- .addExpr("c_integer_1 <> c_integer_1", false)
- .addExpr("c_integer_1 <> c_integer_2", true)
- .addExpr("c_bigint_1 <> c_bigint_1", false)
- .addExpr("c_bigint_1 <> c_bigint_2", true)
- .addExpr("c_float_1 <> c_float_1", false)
- .addExpr("c_float_1 <> c_float_2", true)
- .addExpr("c_double_1 <> c_double_1", false)
- .addExpr("c_double_1 <> c_double_2", true)
- .addExpr("c_decimal_1 <> c_decimal_1", false)
- .addExpr("c_decimal_1 <> c_decimal_2", true)
- .addExpr("c_varchar_1 <> c_varchar_1", false)
- .addExpr("c_varchar_1 <> c_varchar_2", true)
- .addExpr("c_boolean_true <> c_boolean_true", false)
- .addExpr("c_boolean_true <> c_boolean_false", true)
- ;
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testGreaterThan() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_2 > c_tinyint_1", true)
- .addExpr("c_tinyint_1 > c_tinyint_1", false)
- .addExpr("c_tinyint_1 > c_tinyint_2", false)
-
- .addExpr("c_smallint_2 > c_smallint_1", true)
- .addExpr("c_smallint_1 > c_smallint_1", false)
- .addExpr("c_smallint_1 > c_smallint_2", false)
-
- .addExpr("c_integer_2 > c_integer_1", true)
- .addExpr("c_integer_1 > c_integer_1", false)
- .addExpr("c_integer_1 > c_integer_2", false)
-
- .addExpr("c_bigint_2 > c_bigint_1", true)
- .addExpr("c_bigint_1 > c_bigint_1", false)
- .addExpr("c_bigint_1 > c_bigint_2", false)
-
- .addExpr("c_float_2 > c_float_1", true)
- .addExpr("c_float_1 > c_float_1", false)
- .addExpr("c_float_1 > c_float_2", false)
-
- .addExpr("c_double_2 > c_double_1", true)
- .addExpr("c_double_1 > c_double_1", false)
- .addExpr("c_double_1 > c_double_2", false)
-
- .addExpr("c_decimal_2 > c_decimal_1", true)
- .addExpr("c_decimal_1 > c_decimal_1", false)
- .addExpr("c_decimal_1 > c_decimal_2", false)
-
- .addExpr("c_varchar_2 > c_varchar_1", true)
- .addExpr("c_varchar_1 > c_varchar_1", false)
- .addExpr("c_varchar_1 > c_varchar_2", false)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test(expected = RuntimeException.class)
- public void testGreaterThanException() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_boolean_false > c_boolean_true", false);
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testGreaterThanOrEquals() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_2 >= c_tinyint_1", true)
- .addExpr("c_tinyint_1 >= c_tinyint_1", true)
- .addExpr("c_tinyint_1 >= c_tinyint_2", false)
-
- .addExpr("c_smallint_2 >= c_smallint_1", true)
- .addExpr("c_smallint_1 >= c_smallint_1", true)
- .addExpr("c_smallint_1 >= c_smallint_2", false)
-
- .addExpr("c_integer_2 >= c_integer_1", true)
- .addExpr("c_integer_1 >= c_integer_1", true)
- .addExpr("c_integer_1 >= c_integer_2", false)
-
- .addExpr("c_bigint_2 >= c_bigint_1", true)
- .addExpr("c_bigint_1 >= c_bigint_1", true)
- .addExpr("c_bigint_1 >= c_bigint_2", false)
-
- .addExpr("c_float_2 >= c_float_1", true)
- .addExpr("c_float_1 >= c_float_1", true)
- .addExpr("c_float_1 >= c_float_2", false)
-
- .addExpr("c_double_2 >= c_double_1", true)
- .addExpr("c_double_1 >= c_double_1", true)
- .addExpr("c_double_1 >= c_double_2", false)
-
- .addExpr("c_decimal_2 >= c_decimal_1", true)
- .addExpr("c_decimal_1 >= c_decimal_1", true)
- .addExpr("c_decimal_1 >= c_decimal_2", false)
-
- .addExpr("c_varchar_2 >= c_varchar_1", true)
- .addExpr("c_varchar_1 >= c_varchar_1", true)
- .addExpr("c_varchar_1 >= c_varchar_2", false)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test(expected = RuntimeException.class)
- public void testGreaterThanOrEqualsException() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_boolean_false >= c_boolean_true", false);
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testLessThan() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_2 < c_tinyint_1", false)
- .addExpr("c_tinyint_1 < c_tinyint_1", false)
- .addExpr("c_tinyint_1 < c_tinyint_2", true)
-
- .addExpr("c_smallint_2 < c_smallint_1", false)
- .addExpr("c_smallint_1 < c_smallint_1", false)
- .addExpr("c_smallint_1 < c_smallint_2", true)
-
- .addExpr("c_integer_2 < c_integer_1", false)
- .addExpr("c_integer_1 < c_integer_1", false)
- .addExpr("c_integer_1 < c_integer_2", true)
-
- .addExpr("c_bigint_2 < c_bigint_1", false)
- .addExpr("c_bigint_1 < c_bigint_1", false)
- .addExpr("c_bigint_1 < c_bigint_2", true)
-
- .addExpr("c_float_2 < c_float_1", false)
- .addExpr("c_float_1 < c_float_1", false)
- .addExpr("c_float_1 < c_float_2", true)
-
- .addExpr("c_double_2 < c_double_1", false)
- .addExpr("c_double_1 < c_double_1", false)
- .addExpr("c_double_1 < c_double_2", true)
-
- .addExpr("c_decimal_2 < c_decimal_1", false)
- .addExpr("c_decimal_1 < c_decimal_1", false)
- .addExpr("c_decimal_1 < c_decimal_2", true)
-
- .addExpr("c_varchar_2 < c_varchar_1", false)
- .addExpr("c_varchar_1 < c_varchar_1", false)
- .addExpr("c_varchar_1 < c_varchar_2", true)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test(expected = RuntimeException.class)
- public void testLessThanException() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_boolean_false < c_boolean_true", false);
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testLessThanOrEquals() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_tinyint_2 <= c_tinyint_1", false)
- .addExpr("c_tinyint_1 <= c_tinyint_1", true)
- .addExpr("c_tinyint_1 <= c_tinyint_2", true)
-
- .addExpr("c_smallint_2 <= c_smallint_1", false)
- .addExpr("c_smallint_1 <= c_smallint_1", true)
- .addExpr("c_smallint_1 <= c_smallint_2", true)
-
- .addExpr("c_integer_2 <= c_integer_1", false)
- .addExpr("c_integer_1 <= c_integer_1", true)
- .addExpr("c_integer_1 <= c_integer_2", true)
-
- .addExpr("c_bigint_2 <= c_bigint_1", false)
- .addExpr("c_bigint_1 <= c_bigint_1", true)
- .addExpr("c_bigint_1 <= c_bigint_2", true)
-
- .addExpr("c_float_2 <= c_float_1", false)
- .addExpr("c_float_1 <= c_float_1", true)
- .addExpr("c_float_1 <= c_float_2", true)
-
- .addExpr("c_double_2 <= c_double_1", false)
- .addExpr("c_double_1 <= c_double_1", true)
- .addExpr("c_double_1 <= c_double_2", true)
-
- .addExpr("c_decimal_2 <= c_decimal_1", false)
- .addExpr("c_decimal_1 <= c_decimal_1", true)
- .addExpr("c_decimal_1 <= c_decimal_2", true)
-
- .addExpr("c_varchar_2 <= c_varchar_1", false)
- .addExpr("c_varchar_1 <= c_varchar_1", true)
- .addExpr("c_varchar_1 <= c_varchar_2", true)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Test(expected = RuntimeException.class)
- public void testLessThanOrEqualsException() {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_boolean_false <= c_boolean_true", false);
- checker.buildRunAndCheck();
- }
-
- @Test
- public void testIsNullAndIsNotNull() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("1 IS NOT NULL", true)
- .addExpr("NULL IS NOT NULL", false)
-
- .addExpr("1 IS NULL", false)
- .addExpr("NULL IS NULL", true)
- ;
-
- checker.buildRunAndCheck();
- }
-
- @Override protected PCollection<BeamSqlRow> getTestPCollection() {
- BeamSqlRowType type = BeamSqlRowType.create(
- Arrays.asList(
- "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
- "c_smallint_0", "c_smallint_1", "c_smallint_2",
- "c_integer_0", "c_integer_1", "c_integer_2",
- "c_bigint_0", "c_bigint_1", "c_bigint_2",
- "c_float_0", "c_float_1", "c_float_2",
- "c_double_0", "c_double_1", "c_double_2",
- "c_decimal_0", "c_decimal_1", "c_decimal_2",
- "c_varchar_0", "c_varchar_1", "c_varchar_2",
- "c_boolean_false", "c_boolean_true"
- ),
- Arrays.asList(
- Types.TINYINT, Types.TINYINT, Types.TINYINT,
- Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
- Types.INTEGER, Types.INTEGER, Types.INTEGER,
- Types.BIGINT, Types.BIGINT, Types.BIGINT,
- Types.FLOAT, Types.FLOAT, Types.FLOAT,
- Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
- Types.DECIMAL, Types.DECIMAL, Types.DECIMAL,
- Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
- Types.BOOLEAN, Types.BOOLEAN
- )
- );
- try {
- return MockedBoundedTable
- .of(type)
- .addRows(
- (byte) 0, (byte) 1, (byte) 2,
- (short) 0, (short) 1, (short) 2,
- 0, 1, 2,
- 0L, 1L, 2L,
- 0.0f, 1.0f, 2.0f,
- 0.0, 1.0, 2.0,
- BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE),
- "a", "b", "c",
- false, true
- )
- .buildIOReader(pipeline)
- .setCoder(new BeamSqlRowCoder(type));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
deleted file mode 100644
index 6233aeb..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.integrationtest;
-
-import org.junit.Test;
-
-/**
- * Integration test for conditional functions.
- */
-public class BeamSqlConditionalFunctionsIntegrationTest
- extends BeamSqlBuiltinFunctionsIntegrationTestBase {
- @Test
- public void testConditionalFunctions() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr(
- "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END",
- "hello"
- )
- .addExpr(
- "CASE 2 "
- + "WHEN 1 THEN 'hello' "
- + "WHEN 3 THEN 'bond' "
- + "ELSE 'world' END",
- "world"
- )
- .addExpr(
- "CASE "
- + "WHEN 1 = 1 THEN 'hello' "
- + "ELSE 'world' END",
- "hello"
- )
- .addExpr(
- "CASE "
- + "WHEN 1 > 1 THEN 'hello' "
- + "ELSE 'world' END",
- "world"
- )
- .addExpr("NULLIF(5, 4) ", 5)
- .addExpr("COALESCE(1, 5) ", 1)
- .addExpr("COALESCE(NULL, 5) ", 5)
- ;
-
- checker.buildRunAndCheck();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
deleted file mode 100644
index bd0d3ba..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.integrationtest;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Date;
-import java.util.Iterator;
-import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Integration test for date functions.
- */
-public class BeamSqlDateFunctionsIntegrationTest
- extends BeamSqlBuiltinFunctionsIntegrationTestBase {
- @Test public void testDateTimeFunctions() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
- .addExpr("YEAR(ts)", 1986L)
- .addExpr("QUARTER(ts)", 1L)
- .addExpr("MONTH(ts)", 2L)
- .addExpr("WEEK(ts)", 7L)
- .addExpr("DAYOFMONTH(ts)", 15L)
- .addExpr("DAYOFYEAR(ts)", 46L)
- .addExpr("DAYOFWEEK(ts)", 7L)
- .addExpr("HOUR(ts)", 11L)
- .addExpr("MINUTE(ts)", 35L)
- .addExpr("SECOND(ts)", 26L)
- .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
- .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"))
- ;
- checker.buildRunAndCheck();
- }
-
- @Test public void testDateTimeFunctions_currentTime() throws Exception {
- String sql = "SELECT "
- + "LOCALTIME as l,"
- + "LOCALTIMESTAMP as l1,"
- + "CURRENT_DATE as c1,"
- + "CURRENT_TIME as c2,"
- + "CURRENT_TIMESTAMP as c3"
- + " FROM PCOLLECTION"
- ;
- PCollection<BeamSqlRow> rows = getTestPCollection().apply(
- BeamSql.simpleQuery(sql));
- PAssert.that(rows).satisfies(new Checker());
- pipeline.run();
- }
-
- private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
- @Override public Void apply(Iterable<BeamSqlRow> input) {
- Iterator<BeamSqlRow> iter = input.iterator();
- assertTrue(iter.hasNext());
- BeamSqlRow row = iter.next();
- // LOCALTIME
- Date date = new Date();
- assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000);
- assertTrue(date.getTime() - row.getDate(1).getTime() < 1000);
- assertTrue(date.getTime() - row.getDate(2).getTime() < 1000);
- assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000);
- assertTrue(date.getTime() - row.getDate(4).getTime() < 1000);
- assertFalse(iter.hasNext());
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
deleted file mode 100644
index 4ed1f86..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.integrationtest;
-
-import org.junit.Test;
-
-/**
- * Integration test for logical functions.
- */
-public class BeamSqlLogicalFunctionsIntegrationTest
- extends BeamSqlBuiltinFunctionsIntegrationTestBase {
- @Test
- public void testStringFunctions() throws Exception {
- ExpressionChecker checker = new ExpressionChecker()
- .addExpr("c_integer = 1 AND c_bigint = 1", true)
- .addExpr("c_integer = 1 OR c_bigint = 2", true)
- .addExpr("NOT c_bigint = 2", true)
- .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true)
- .addExpr("c_integer = 2 AND c_bigint = 1", false)
- .addExpr("c_integer = 2 OR c_bigint = 2", false)
- .addExpr("NOT c_bigint = 1", false)
- .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false)
- ;
-
- checker.buildRunAndCheck();
- }
-
-}