You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/12 17:04:24 UTC
[1/2] beam git commit: Test queries on unbounded PCollections with
BeamSql DSL API. Also add getTYPE(fieldName) overrides to BeamSqlRow.
Repository: beam
Updated Branches:
refs/heads/DSL_SQL a96c3a01f -> 8defe6f21
Test queries on unbounded PCollections with BeamSql DSL API.
Also add getTYPE(fieldName) overrides to BeamSqlRow.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0580e8b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0580e8b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0580e8b6
Branch: refs/heads/DSL_SQL
Commit: 0580e8b639ef77c7a6534b7b91ecad493950a3aa
Parents: a96c3a0
Author: mingmxu <mi...@ebay.com>
Authored: Wed Jul 12 00:08:35 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:01:34 2017 -0700
----------------------------------------------------------------------
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 84 ++++++++----
.../dsls/sql/BeamSqlDslAggregationTest.java | 127 +++++++++++++++----
.../apache/beam/dsls/sql/BeamSqlDslBase.java | 45 ++++++-
.../beam/dsls/sql/BeamSqlDslFilterTest.java | 62 +++++++--
.../beam/dsls/sql/BeamSqlDslProjectTest.java | 94 +++++++++++---
5 files changed, 327 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 2d7e350..db0ce04 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -152,48 +152,48 @@ public class BeamSqlRow implements Serializable {
dataValues.set(index, fieldValue);
}
- public byte getByte(int idx) {
- return (Byte) getFieldValue(idx);
+ public Object getFieldValue(String fieldName) {
+ return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
}
- public short getShort(int idx) {
- return (Short) getFieldValue(idx);
+ public byte getByte(String fieldName) {
+ return (Byte) getFieldValue(fieldName);
}
- public int getInteger(int idx) {
- return (Integer) getFieldValue(idx);
+ public short getShort(String fieldName) {
+ return (Short) getFieldValue(fieldName);
}
- public float getFloat(int idx) {
- return (Float) getFieldValue(idx);
+ public int getInteger(String fieldName) {
+ return (Integer) getFieldValue(fieldName);
}
- public double getDouble(int idx) {
- return (Double) getFieldValue(idx);
+ public float getFloat(String fieldName) {
+ return (Float) getFieldValue(fieldName);
}
- public long getLong(int idx) {
- return (Long) getFieldValue(idx);
+ public double getDouble(String fieldName) {
+ return (Double) getFieldValue(fieldName);
}
- public String getString(int idx) {
- return (String) getFieldValue(idx);
+ public long getLong(String fieldName) {
+ return (Long) getFieldValue(fieldName);
}
- public Date getDate(int idx) {
- return (Date) getFieldValue(idx);
+ public String getString(String fieldName) {
+ return (String) getFieldValue(fieldName);
}
- public GregorianCalendar getGregorianCalendar(int idx) {
- return (GregorianCalendar) getFieldValue(idx);
+ public Date getDate(String fieldName) {
+ return (Date) getFieldValue(fieldName);
}
- public BigDecimal getBigDecimal(int idx) {
- return (BigDecimal) getFieldValue(idx);
+ public GregorianCalendar getGregorianCalendar(String fieldName) {
+ return (GregorianCalendar) getFieldValue(fieldName);
}
- public Object getFieldValue(String fieldName) {
- return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ public BigDecimal getBigDecimal(String fieldName) {
+ return (BigDecimal) getFieldValue(fieldName);
}
public Object getFieldValue(int fieldIdx) {
@@ -281,6 +281,46 @@ public class BeamSqlRow implements Serializable {
}
}
+ public byte getByte(int idx) {
+ return (Byte) getFieldValue(idx);
+ }
+
+ public short getShort(int idx) {
+ return (Short) getFieldValue(idx);
+ }
+
+ public int getInteger(int idx) {
+ return (Integer) getFieldValue(idx);
+ }
+
+ public float getFloat(int idx) {
+ return (Float) getFieldValue(idx);
+ }
+
+ public double getDouble(int idx) {
+ return (Double) getFieldValue(idx);
+ }
+
+ public long getLong(int idx) {
+ return (Long) getFieldValue(idx);
+ }
+
+ public String getString(int idx) {
+ return (String) getFieldValue(idx);
+ }
+
+ public Date getDate(int idx) {
+ return (Date) getFieldValue(idx);
+ }
+
+ public GregorianCalendar getGregorianCalendar(int idx) {
+ return (GregorianCalendar) getFieldValue(idx);
+ }
+
+ public BigDecimal getBigDecimal(int idx) {
+ return (BigDecimal) getFieldValue(idx);
+ }
+
public int size() {
return dataValues.size();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index ac0b1cb..471a856 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -29,18 +29,31 @@ import org.joda.time.Instant;
import org.junit.Test;
/**
- * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window.
+ * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window
+ * with BOUNDED PCollection.
*/
public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
/**
- * GROUP-BY with single aggregation function.
+ * GROUP-BY with single aggregation function with bounded PCollection.
*/
@Test
- public void testAggregationWithoutWindow() throws Exception {
+ public void testAggregationWithoutWindowWithBounded() throws Exception {
+ runAggregationWithoutWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with single aggregation function with unbounded PCollection.
+ */
+ @Test
+ public void testAggregationWithoutWindowWithUnbounded() throws Exception {
+ runAggregationWithoutWindow(unboundedInput1);
+ }
+
+ private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
PCollection<BeamSqlRow> result =
- inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
+ input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
@@ -55,10 +68,22 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
/**
- * GROUP-BY with multiple aggregation functions.
+ * GROUP-BY with multiple aggregation functions with bounded PCollection.
*/
@Test
- public void testAggregationFunctions() throws Exception{
+ public void testAggregationFunctionsWithBounded() throws Exception{
+ runAggregationFunctions(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with multiple aggregation functions with unbounded PCollection.
+ */
+ @Test
+ public void testAggregationFunctionsWithUnbounded() throws Exception{
+ runAggregationFunctions(unboundedInput1);
+ }
+
+ private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
String sql = "select f_int2, count(*) as size, "
+ "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+ "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
@@ -70,7 +95,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
+ "FROM TABLE_A group by f_int2";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testAggregationFunctions", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -121,14 +146,26 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
/**
- * Implicit GROUP-BY with DISTINCT.
+ * Implicit GROUP-BY with DISTINCT with bounded PCollection.
+ */
+ @Test
+ public void testDistinctWithBounded() throws Exception {
+ runDistinct(boundedInput1);
+ }
+
+ /**
+ * Implicit GROUP-BY with DISTINCT with unbounded PCollection.
*/
@Test
- public void testDistinct() throws Exception {
+ public void testDistinctWithUnbounded() throws Exception {
+ runDistinct(unboundedInput1);
+ }
+
+ private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
PCollection<BeamSqlRow> result =
- inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
+ input.apply("testDistinct", BeamSql.simpleQuery(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
@@ -155,16 +192,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
/**
- * GROUP-BY with TUMBLE window(akka fix_time_window).
+ * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection.
*/
@Test
- public void testTumbleWindow() throws Exception {
+ public void testTumbleWindowWithBounded() throws Exception {
+ runTumbleWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection.
+ */
+ @Test
+ public void testTumbleWindowWithUnbounded() throws Exception {
+ runTumbleWindow(unboundedInput1);
+ }
+
+ private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
- + " FROM TABLE_A "
- + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+ + " FROM TABLE_A"
+ + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testTumbleWindow", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -191,16 +240,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
/**
- * GROUP-BY with HOP window(akka sliding_window).
+ * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection.
*/
@Test
- public void testHopWindow() throws Exception {
+ public void testHopWindowWithBounded() throws Exception {
+ runHopWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection.
+ */
+ @Test
+ public void testHopWindowWithUnbounded() throws Exception {
+ runHopWindow(unboundedInput1);
+ }
+
+ private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
- + " FROM PCOLLECTION "
- + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+ + " FROM PCOLLECTION"
+ + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
PCollection<BeamSqlRow> result =
- inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
+ input.apply("testHopWindow", BeamSql.simpleQuery(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(
Arrays.asList("f_int2", "size", "window_start"),
@@ -240,16 +301,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
}
/**
- * GROUP-BY with SESSION window.
+ * GROUP-BY with SESSION window with bounded PCollection.
+ */
+ @Test
+ public void testSessionWindowWithBounded() throws Exception {
+ runSessionWindow(boundedInput1);
+ }
+
+ /**
+ * GROUP-BY with SESSION window with unbounded PCollection.
*/
@Test
- public void testSessionWindow() throws Exception {
+ public void testSessionWindowWithUnbounded() throws Exception {
+ runSessionWindow(unboundedInput1);
+ }
+
+ private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
- + " FROM TABLE_A "
- + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+ + " FROM TABLE_A"
+ + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testSessionWindow", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(
@@ -285,7 +358,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+ "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
.apply("testWindowOnNonTimestampField", BeamSql.query(sql));
pipeline.run().waitUntilFinish();
@@ -300,7 +373,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
PCollection<BeamSqlRow> result =
- inputA1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
+ boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index 308dcb6..57fcbc3 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -28,9 +28,11 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -53,8 +55,13 @@ public class BeamSqlDslBase {
public static BeamSqlRecordType recordTypeInTableA;
public static List<BeamSqlRow> recordsInTableA;
- public PCollection<BeamSqlRow> inputA1;
- public PCollection<BeamSqlRow> inputA2;
+ //bounded PCollections
+ public PCollection<BeamSqlRow> boundedInput1;
+ public PCollection<BeamSqlRow> boundedInput2;
+
+ //unbounded PCollections
+ public PCollection<BeamSqlRow> unboundedInput1;
+ public PCollection<BeamSqlRow> unboundedInput2;
@BeforeClass
public static void prepareClass() throws ParseException {
@@ -69,11 +76,37 @@ public class BeamSqlDslBase {
@Before
public void preparePCollections(){
- inputA1 = PBegin.in(pipeline).apply("inputA1", Create.of(recordsInTableA)
- .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+ boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
+ Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+
+ boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
+ Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+
+ unboundedInput1 = prepareUnboundedPCollection1();
+ unboundedInput2 = prepareUnboundedPCollection2();
+ }
+
+ private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
+ TestStream.Builder<BeamSqlRow> values = TestStream
+ .create(new BeamSqlRowCoder(recordTypeInTableA));
+
+ for (BeamSqlRow row : recordsInTableA) {
+ values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+ values = values.addElements(row);
+ }
+
+ return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
+ }
+
+ private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
+ TestStream.Builder<BeamSqlRow> values = TestStream
+ .create(new BeamSqlRowCoder(recordTypeInTableA));
+
+ BeamSqlRow row = recordsInTableA.get(0);
+ values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+ values = values.addElements(row);
- inputA2 = PBegin.in(pipeline).apply("inputA2", Create.of(recordsInTableA.get(0))
- .withCoder(new BeamSqlRowCoder(recordTypeInTableA)));
+ return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
}
private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{
http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
index f46f6c5..b4b50c1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java
@@ -25,18 +25,30 @@ import org.apache.beam.sdk.values.TupleTag;
import org.junit.Test;
/**
- * Tests for WHERE queries.
+ * Tests for WHERE queries with BOUNDED PCollection.
*/
public class BeamSqlDslFilterTest extends BeamSqlDslBase {
/**
- * single filter.
+ * single filter with bounded PCollection.
*/
@Test
- public void testSingleFilter() throws Exception {
+ public void testSingleFilterWithBounded() throws Exception {
+ runSingleFilter(boundedInput1);
+ }
+
+ /**
+ * single filter with unbounded PCollection.
+ */
+ @Test
+ public void testSingleFilterWithUnbounded() throws Exception {
+ runSingleFilter(unboundedInput1);
+ }
+
+ private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
PCollection<BeamSqlRow> result =
- inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));
+ input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -44,15 +56,27 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
}
/**
- * composite filters.
+ * composite filters with bounded PCollection.
*/
@Test
- public void testCompositeFilter() throws Exception {
+ public void testCompositeFilterWithBounded() throws Exception {
+ runCompositeFilter(boundedInput1);
+ }
+
+ /**
+ * composite filters with unbounded PCollection.
+ */
+ @Test
+ public void testCompositeFilterWithUnbounded() throws Exception {
+ runCompositeFilter(unboundedInput1);
+ }
+
+ private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT * FROM TABLE_A"
+ " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testCompositeFilter", BeamSql.query(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
@@ -61,14 +85,26 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
}
/**
- * nothing return with filters.
+ * nothing return with filters in bounded PCollection.
+ */
+ @Test
+ public void testNoReturnFilterWithBounded() throws Exception {
+ runNoReturnFilter(boundedInput1);
+ }
+
+ /**
+ * nothing return with filters in unbounded PCollection.
*/
@Test
- public void testNoReturnFilter() throws Exception {
+ public void testNoReturnFilterWithUnbounded() throws Exception {
+ runNoReturnFilter(unboundedInput1);
+ }
+
+ private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testNoReturnFilter", BeamSql.query(sql));
PAssert.that(result).empty();
@@ -85,7 +121,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
.apply("testFromInvalidTableName1", BeamSql.query(sql));
pipeline.run().waitUntilFinish();
@@ -99,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM PCOLLECTION_NA";
- PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+ PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
}
@@ -112,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
- PCollection<BeamSqlRow> result = inputA1.apply(BeamSql.simpleQuery(sql));
+ PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0580e8b6/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 877fa4f..10f61b0 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -28,18 +28,30 @@ import org.apache.beam.sdk.values.TupleTag;
import org.junit.Test;
/**
- * Tests for field-project in queries.
+ * Tests for field-project in queries with BOUNDED PCollection.
*/
public class BeamSqlDslProjectTest extends BeamSqlDslBase {
/**
- * select all fields.
+ * select all fields with bounded PCollection.
*/
@Test
- public void testSelectAll() throws Exception {
+ public void testSelectAllWithBounded() throws Exception {
+ runSelectAll(boundedInput2);
+ }
+
+ /**
+ * select all fields with unbounded PCollection.
+ */
+ @Test
+ public void testSelectAllWithUnbounded() throws Exception {
+ runSelectAll(unboundedInput2);
+ }
+
+ private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT * FROM PCOLLECTION";
PCollection<BeamSqlRow> result =
- inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
+ input.apply("testSelectAll", BeamSql.simpleQuery(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -47,14 +59,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
}
/**
- * select partial fields.
+ * select partial fields with bounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsWithBounded() throws Exception {
+ runPartialFields(boundedInput2);
+ }
+
+ /**
+ * select partial fields with unbounded PCollection.
*/
@Test
- public void testPartialFields() throws Exception {
+ public void testPartialFieldsWithUnbounded() throws Exception {
+ runPartialFields(unboundedInput2);
+ }
+
+ private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFields", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -70,14 +94,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
}
/**
- * select partial fields for multiple rows.
+ * select partial fields for multiple rows with bounded PCollection.
*/
@Test
- public void testPartialFieldsInMultipleRow() throws Exception {
+ public void testPartialFieldsInMultipleRowWithBounded() throws Exception {
+ runPartialFieldsInMultipleRow(boundedInput1);
+ }
+
+ /**
+ * select partial fields for multiple rows with unbounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception {
+ runPartialFieldsInMultipleRow(unboundedInput1);
+ }
+
+ private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -105,14 +141,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
}
/**
- * select partial fields.
+ * select partial fields with bounded PCollection.
*/
@Test
- public void testPartialFieldsInRows() throws Exception {
+ public void testPartialFieldsInRowsWithBounded() throws Exception {
+ runPartialFieldsInRows(boundedInput1);
+ }
+
+ /**
+ * select partial fields with unbounded PCollection.
+ */
+ @Test
+ public void testPartialFieldsInRowsWithUnbounded() throws Exception {
+ runPartialFieldsInRows(unboundedInput1);
+ }
+
+ private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testPartialFieldsInRows", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
@@ -140,14 +188,26 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
}
/**
- * select literal field.
+ * select literal field with bounded PCollection.
+ */
+ @Test
+ public void testLiteralFieldWithBounded() throws Exception {
+ runLiteralField(boundedInput2);
+ }
+
+ /**
+ * select literal field with unbounded PCollection.
*/
@Test
- public void testLiteralField() throws Exception {
+ public void testLiteralFieldWithUnbounded() throws Exception {
+ runLiteralField(unboundedInput2);
+ }
+
+ public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
String sql = "SELECT 1 as literal_field FROM TABLE_A";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
.apply("testLiteralField", BeamSql.query(sql));
BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
@@ -170,7 +230,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
String sql = "SELECT f_int_na FROM TABLE_A";
PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA2)
+ PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
.apply("testProjectUnknownField", BeamSql.query(sql));
pipeline.run().waitUntilFinish();
[2/2] beam git commit: [BEAM-2527] This closes #3477
Posted by ta...@apache.org.
[BEAM-2527] This closes #3477
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8defe6f2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8defe6f2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8defe6f2
Branch: refs/heads/DSL_SQL
Commit: 8defe6f21524b4ca00dd984176260c1bd0739774
Parents: a96c3a0 0580e8b
Author: Tyler Akidau <ta...@apache.org>
Authored: Wed Jul 12 10:03:28 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:03:28 2017 -0700
----------------------------------------------------------------------
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 84 ++++++++----
.../dsls/sql/BeamSqlDslAggregationTest.java | 127 +++++++++++++++----
.../apache/beam/dsls/sql/BeamSqlDslBase.java | 45 ++++++-
.../beam/dsls/sql/BeamSqlDslFilterTest.java | 62 +++++++--
.../beam/dsls/sql/BeamSqlDslProjectTest.java | 94 +++++++++++---
5 files changed, 327 insertions(+), 85 deletions(-)
----------------------------------------------------------------------