You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/08/07 21:48:38 UTC

[beam] branch master updated: [BEAM-10653] Modularize BeamSqlDslUdfUdafTest.

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 73408de  [BEAM-10653] Modularize BeamSqlDslUdfUdafTest.
     new 0d6c6b7  Merge pull request #12484 from ibzib/BEAM-10653
73408de is described below

commit 73408deff1106c18965cbe7f60be321e501367c2
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Aug 6 13:09:22 2020 -0700

    [BEAM-10653] Modularize BeamSqlDslUdfUdafTest.
    
    Split tests with multiple, independent branches into separate test
    cases.
    
    The only part removed was in testUdaf. The two cases in testUdaf were
    different in the past, but converged at some point.
    https://github.com/apache/beam/blob/d8ff78b65bbe7e3a2239249f034a538ca65b0706/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java#L52
---
 .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java  | 149 +++++++++++----------
 1 file changed, 81 insertions(+), 68 deletions(-)

diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index c2afc5d..352a3cb 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -55,73 +55,77 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
 
     Row row = Row.withSchema(resultType).addValues(0, 30).build();
 
-    String sql1 =
-        "SELECT f_int2, squaresum1(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
+    String sql = "SELECT f_int2, squaresum(f_int) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testUdaf1", SqlTransform.query(sql1).registerUdaf("squaresum1", new SquareSum()));
-    PAssert.that(result1).containsInAnyOrder(row);
-
-    String sql2 =
-        "SELECT f_int2, squaresum2(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result2 =
-        PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
-            .apply(
-                "testUdaf2", SqlTransform.query(sql2).registerUdaf("squaresum2", new SquareSum()));
-    PAssert.that(result2).containsInAnyOrder(row);
+            "testUdaf", SqlTransform.query(sql).registerUdaf("squaresum", new SquareSum()));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
 
-  /** Test Joda time UDF/UDAF. */
+  /** Test Joda time UDF. */
   @Test
-  public void testJodaTimeUdfUdaf() throws Exception {
+  public void testJodaTimeUdf() throws Exception {
     Schema resultType = Schema.builder().addDateTimeField("jodatime").build();
 
-    Row row1 =
+    Row row =
         Row.withSchema(resultType)
             .addValues(parseTimestampWithoutTimeZone("2017-01-01 02:04:03"))
             .build();
 
-    String sql1 = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION";
-    PCollection<Row> result1 =
+    String sql = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testJodaUdaf", SqlTransform.query(sql1).registerUdaf("MAX_JODA", new JodaMax()));
-    PAssert.that(result1).containsInAnyOrder(row1);
+            "testJodaUdaf", SqlTransform.query(sql).registerUdaf("MAX_JODA", new JodaMax()));
+    PAssert.that(result).containsInAnyOrder(row);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Test Joda time UDAF. */
+  @Test
+  public void testJodaTimeUdaf() throws Exception {
+    Schema resultType = Schema.builder().addDateTimeField("jodatime").build();
 
-    Row row2 =
+    Row row =
         Row.withSchema(resultType)
             .addValues(parseTimestampWithoutTimeZone("2016-12-31 01:01:03"))
             .build();
 
-    String sql2 = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1";
-    PCollection<Row> result2 =
+    String sql = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testTimeUdf", SqlTransform.query(sql2).registerUdf("PRE_DAY", PreviousDay.class));
-    PAssert.that(result2).containsInAnyOrder(row2);
+            "testTimeUdf", SqlTransform.query(sql).registerUdf("PRE_DAY", PreviousDay.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
 
   @Test
-  public void testListUdf() throws Exception {
-    Schema resultType1 = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
-    Row row1 = Row.withSchema(resultType1).addValue(Arrays.asList(1L)).build();
-    String sql1 = "SELECT test_array(1)";
-    PCollection<Row> result1 =
+  public void testUdfWithListOutput() throws Exception {
+    Schema resultType = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
+    Row row = Row.withSchema(resultType).addValue(Arrays.asList(1L)).build();
+    String sql = "SELECT test_array(1)";
+    PCollection<Row> result =
         boundedInput1.apply(
             "testArrayUdf",
-            SqlTransform.query(sql1).registerUdf("test_array", TestReturnTypeList.class));
-    PAssert.that(result1).containsInAnyOrder(row1);
+            SqlTransform.query(sql).registerUdf("test_array", TestReturnTypeList.class));
+    PAssert.that(result).containsInAnyOrder(row);
+
+    pipeline.run().waitUntilFinish();
+  }
 
-    Schema resultType2 = Schema.builder().addInt32Field("int_field").build();
-    Row row2 = Row.withSchema(resultType2).addValue(3).build();
-    String sql2 = "select array_length(ARRAY[1, 2, 3])";
-    PCollection<Row> result2 =
+  @Test
+  public void testUdfWithListInput() throws Exception {
+    Schema resultType = Schema.builder().addInt32Field("int_field").build();
+    Row row = Row.withSchema(resultType).addValue(3).build();
+    String sql = "select array_length(ARRAY[1, 2, 3])";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testArrayUdf2",
-            SqlTransform.query(sql2).registerUdf("array_length", TestListLength.class));
-    PAssert.that(result2).containsInAnyOrder(row2);
+            "testArrayUdf",
+            SqlTransform.query(sql).registerUdf("array_length", TestListLength.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
@@ -133,14 +137,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
 
     Row row = Row.withSchema(resultType).addValues(0, 354).build();
 
-    String sql1 =
+    String sql =
         "SELECT f_int2, double_square_sum(f_int) AS `squaresum`"
             + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
+    PCollection<Row> result =
         boundedInput1.apply(
             "testUdaf",
-            SqlTransform.query(sql1).registerUdaf("double_square_sum", new SquareSquareSum()));
-    PAssert.that(result1).containsInAnyOrder(row);
+            SqlTransform.query(sql).registerUdaf("double_square_sum", new SquareSquareSum()));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
@@ -155,46 +159,55 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     exceptions.expectCause(hasMessage(containsString("CombineFn must be parameterized")));
     pipeline.enableAbandonedNodeEnforcement(false);
 
-    Schema resultType = Schema.builder().addInt32Field("f_int2").addInt32Field("squaresum").build();
-
-    Row row = Row.withSchema(resultType).addValues(0, 354).build();
-
-    String sql1 =
+    String sql =
         "SELECT f_int2, squaresum(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
-        boundedInput1.apply(
-            "testUdaf", SqlTransform.query(sql1).registerUdaf("squaresum", new RawCombineFn()));
+    boundedInput1.apply(
+        "testUdaf", SqlTransform.query(sql).registerUdaf("squaresum", new RawCombineFn()));
   }
 
-  /** test UDF. */
+  /** Test UDF implementing {@link BeamSqlUdf}. */
   @Test
-  public void testUdf() throws Exception {
+  public void testBeamSqlUdf() throws Exception {
     Schema resultType = Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build();
     Row row = Row.withSchema(resultType).addValues(2, 8).build();
 
-    String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result1 =
+    String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testUdf1", SqlTransform.query(sql1).registerUdf("cubic1", CubicInteger.class));
-    PAssert.that(result1).containsInAnyOrder(row);
+            "testUdf", SqlTransform.query(sql).registerUdf("cubic", CubicInteger.class));
+    PAssert.that(result).containsInAnyOrder(row);
+
+    pipeline.run().waitUntilFinish();
+  }
 
-    String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result2 =
+  /** Test UDF implementing {@link SerializableFunction}. */
+  @Test
+  public void testSerializableFunctionUdf() throws Exception {
+    Schema resultType = Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build();
+    Row row = Row.withSchema(resultType).addValues(2, 8).build();
+
+    String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
         PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
-            .apply(
-                "testUdf2", SqlTransform.query(sql2).registerUdf("cubic2", new CubicIntegerFn()));
-    PAssert.that(result2).containsInAnyOrder(row);
+            .apply("testUdf", SqlTransform.query(sql).registerUdf("cubic", new CubicIntegerFn()));
+    PAssert.that(result).containsInAnyOrder(row);
+
+    pipeline.run().waitUntilFinish();
+  }
 
-    String sql3 = "SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result3 =
+  /** Test UDF implementing {@link BeamSqlUdf} with default parameters. */
+  @Test
+  public void testBeamSqlUdfWithDefaultParameters() throws Exception {
+    String sql = "SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
         PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
             .apply(
-                "testUdf3", SqlTransform.query(sql3).registerUdf("substr", UdfFnWithDefault.class));
+                "testUdf", SqlTransform.query(sql).registerUdf("substr", UdfFnWithDefault.class));
 
     Schema subStrSchema =
         Schema.builder().addInt32Field("f_int").addStringField("sub_string").build();
     Row subStrRow = Row.withSchema(subStrSchema).addValues(2, "s").build();
-    PAssert.that(result3).containsInAnyOrder(subStrRow);
+    PAssert.that(result).containsInAnyOrder(subStrRow);
 
     pipeline.run().waitUntilFinish();
   }
@@ -204,12 +217,12 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testTableMacroUdf() throws Exception {
-    String sql1 = "SELECT * FROM table(range_udf(0, 3))";
+    String sql = "SELECT * FROM table(range_udf(0, 3))";
 
     Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT32));
 
     PCollection<Row> rows =
-        pipeline.apply(SqlTransform.query(sql1).registerUdf("range_udf", RangeUdf.class));
+        pipeline.apply(SqlTransform.query(sql).registerUdf("range_udf", RangeUdf.class));
 
     PAssert.that(rows)
         .containsInAnyOrder(