You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/29 20:52:49 UTC

[beam] 01/01: Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch revert-12079-master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit efe93afbc10084740b5066b93befd8481dc8dad0
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Jun 29 13:52:19 2020 -0700

    Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"
---
 .../impl/transform/BeamBuiltinAggregations.java    | 35 ----------------------
 .../extensions/sql/BeamSqlDslAggregationTest.java  | 32 --------------------
 .../sql/zetasql/SqlStdOperatorMappingTable.java    |  3 +-
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 17 -----------
 4 files changed, 1 insertion(+), 86 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index ab3786b..347fdc12 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -58,7 +58,6 @@ public class BeamBuiltinAggregations {
               .put("$SUM0", BeamBuiltinAggregations::createSum)
               .put("AVG", BeamBuiltinAggregations::createAvg)
               .put("BIT_OR", BeamBuiltinAggregations::createBitOr)
-              .put("BIT_AND", BeamBuiltinAggregations::createBitAnd)
               .put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName()))
               .put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName()))
               .put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName()))
@@ -186,14 +185,6 @@ public class BeamBuiltinAggregations {
         String.format("[%s] is not supported in BIT_OR", fieldType));
   }
 
-  static CombineFn createBitAnd(Schema.FieldType fieldType) {
-    if (fieldType.getTypeName() == TypeName.INT64) {
-      return new BitAnd();
-    }
-    throw new UnsupportedOperationException(
-        String.format("[%s] is not supported in BIT_AND", fieldType));
-  }
-
   static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> {
     @Override
     public T apply(T left, T right) {
@@ -392,30 +383,4 @@ public class BeamBuiltinAggregations {
       return accum;
     }
   }
-
-  static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
-    @Override
-    public Long createAccumulator() {
-      return -1L;
-    }
-
-    @Override
-    public Long addInput(Long accum, T input) {
-      return accum & input.longValue();
-    }
-
-    @Override
-    public Long mergeAccumulators(Iterable<Long> accums) {
-      Long merged = createAccumulator();
-      for (long accum : accums) {
-        merged = merged & accum;
-      }
-      return merged;
-    }
-
-    @Override
-    public Long extractOutput(Long accum) {
-      return accum;
-    }
-  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 9c365b2..40b3b63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -314,39 +314,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<Row> inputRows =
         pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
     PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
-    PAssert.that(result).containsInAnyOrder(rowResult);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testBitAndFunction() throws Exception {
-    pipeline.enableAbandonedNodeEnforcement(false);
-
-    Schema schemaInTableA =
-        Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
-
-    Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
-
-    List<Row> rowsInTableA =
-        TestUtils.RowsBuilder.of(schemaInTableA)
-            .addRows(
-                0xF001L, 0,
-                0x00A1L, 0)
-            .getRows();
-
-    String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2";
-
-    Row rowResult = Row.withSchema(resultType).addValues(1L).build();
-
-    PCollection<Row> inputRows =
-        pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
-    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
     PAssert.that(result).containsInAnyOrder(rowResult);
-
-    pipeline.run().waitUntilFinish();
   }
 
   private static class CheckerBigDecimalDivide
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index 1e1d507..0dae1d8 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -35,7 +35,6 @@ public class SqlStdOperatorMappingTable {
           FunctionSignatureId.FN_ANY_VALUE,
           FunctionSignatureId.FN_STRING_AGG_STRING,
           FunctionSignatureId.FN_BIT_OR_INT64,
-          FunctionSignatureId.FN_BIT_AND_INT64,
           FunctionSignatureId.FN_OR,
           FunctionSignatureId.FN_NOT,
           FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -239,7 +238,7 @@ public class SqlStdOperatorMappingTable {
           // .put("array_agg", )
           // .put("array_concat_agg")
           .put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported
-          .put("bit_and", SqlStdOperatorTable.BIT_AND)
+          // .put("bit_and")
           // .put("bit_xor")
           // .put("logical_and")
           // .put("logical_or")
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 0d787c6..e7f442d 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -4892,23 +4892,6 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase {
   }
 
   @Test
-  public void testZetaSQLBitAnd() {
-    String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    final Schema schema = Schema.builder().addInt64Field("field1").build();
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(schema).addValue(1L).build(),
-            Row.withSchema(schema).addValue(0L).build());
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
   public void testSimpleTableName() {
     String sql = "SELECT Key FROM KeyValue";