You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/24 20:50:31 UTC

[beam] branch master updated: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d41dd50  [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests
     new 18ce248  Merge pull request #12079 from Imfuyuwei/master
d41dd50 is described below

commit d41dd5018f90bcac90079f68624710c811f05e50
Author: Yuwei Fu <fu...@google.com>
AuthorDate: Wed Jun 24 06:18:45 2020 +0000

    [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests
---
 .../impl/transform/BeamBuiltinAggregations.java    | 35 ++++++++++++++++++++++
 .../extensions/sql/BeamSqlDslAggregationTest.java  | 32 ++++++++++++++++++++
 .../sql/zetasql/SqlStdOperatorMappingTable.java    |  3 +-
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 17 +++++++++++
 4 files changed, 86 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index 347fdc12..ab3786b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -58,6 +58,7 @@ public class BeamBuiltinAggregations {
               .put("$SUM0", BeamBuiltinAggregations::createSum)
               .put("AVG", BeamBuiltinAggregations::createAvg)
               .put("BIT_OR", BeamBuiltinAggregations::createBitOr)
+              .put("BIT_AND", BeamBuiltinAggregations::createBitAnd)
               .put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName()))
               .put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName()))
               .put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName()))
@@ -185,6 +186,14 @@ public class BeamBuiltinAggregations {
         String.format("[%s] is not supported in BIT_OR", fieldType));
   }
 
+  static CombineFn createBitAnd(Schema.FieldType fieldType) {
+    if (fieldType.getTypeName() == TypeName.INT64) {
+      return new BitAnd();
+    }
+    throw new UnsupportedOperationException(
+        String.format("[%s] is not supported in BIT_AND", fieldType));
+  }
+
   static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> {
     @Override
     public T apply(T left, T right) {
@@ -383,4 +392,30 @@ public class BeamBuiltinAggregations {
       return accum;
     }
   }
+
+  static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
+    @Override
+    public Long createAccumulator() {
+      return -1L;
+    }
+
+    @Override
+    public Long addInput(Long accum, T input) {
+      return accum & input.longValue();
+    }
+
+    @Override
+    public Long mergeAccumulators(Iterable<Long> accums) {
+      Long merged = createAccumulator();
+      for (long accum : accums) {
+        merged = merged & accum;
+      }
+      return merged;
+    }
+
+    @Override
+    public Long extractOutput(Long accum) {
+      return accum;
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 40b3b63..9c365b2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -314,7 +314,39 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<Row> inputRows =
         pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
     PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+    PAssert.that(result).containsInAnyOrder(rowResult);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testBitAndFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    Schema schemaInTableA =
+        Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
+
+    Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
+
+    List<Row> rowsInTableA =
+        TestUtils.RowsBuilder.of(schemaInTableA)
+            .addRows(
+                0xF001L, 0,
+                0x00A1L, 0)
+            .getRows();
+
+    String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2";
+
+    Row rowResult = Row.withSchema(resultType).addValues(1L).build();
+
+    PCollection<Row> inputRows =
+        pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
+    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
     PAssert.that(result).containsInAnyOrder(rowResult);
+
+    pipeline.run().waitUntilFinish();
   }
 
   private static class CheckerBigDecimalDivide
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index b89e053..4675f62 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -35,6 +35,7 @@ public class SqlStdOperatorMappingTable {
           FunctionSignatureId.FN_ANY_VALUE,
           FunctionSignatureId.FN_STRING_AGG_STRING,
           FunctionSignatureId.FN_BIT_OR_INT64,
+          FunctionSignatureId.FN_BIT_AND_INT64,
           FunctionSignatureId.FN_OR,
           FunctionSignatureId.FN_NOT,
           FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -226,7 +227,7 @@ public class SqlStdOperatorMappingTable {
           // .put("array_agg", )
           // .put("array_concat_agg")
           .put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported
-          // .put("bit_and")
+          .put("bit_and", SqlStdOperatorTable.BIT_AND)
           // .put("bit_xor")
           // .put("logical_and")
           // .put("logical_or")
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 82750eb..84f3eb9 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -4613,6 +4613,23 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase {
   }
 
   @Test
+  public void testZetaSQLBitAnd() {
+    String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema = Schema.builder().addInt64Field("field1").build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValue(1L).build(),
+            Row.withSchema(schema).addValue(0L).build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testSimpleTableName() {
     String sql = "SELECT Key FROM KeyValue";