You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/29 20:52:48 UTC

[beam] branch revert-12079-master created (now efe93af)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a change to branch revert-12079-master
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at efe93af  Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"

This branch includes the following new commits:

     new efe93af  Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch revert-12079-master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit efe93afbc10084740b5066b93befd8481dc8dad0
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Jun 29 13:52:19 2020 -0700

    Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"
---
 .../impl/transform/BeamBuiltinAggregations.java    | 35 ----------------------
 .../extensions/sql/BeamSqlDslAggregationTest.java  | 32 --------------------
 .../sql/zetasql/SqlStdOperatorMappingTable.java    |  3 +-
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 17 -----------
 4 files changed, 1 insertion(+), 86 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index ab3786b..347fdc12 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -58,7 +58,6 @@ public class BeamBuiltinAggregations {
               .put("$SUM0", BeamBuiltinAggregations::createSum)
               .put("AVG", BeamBuiltinAggregations::createAvg)
               .put("BIT_OR", BeamBuiltinAggregations::createBitOr)
-              .put("BIT_AND", BeamBuiltinAggregations::createBitAnd)
               .put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName()))
               .put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName()))
               .put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName()))
@@ -186,14 +185,6 @@ public class BeamBuiltinAggregations {
         String.format("[%s] is not supported in BIT_OR", fieldType));
   }
 
-  static CombineFn createBitAnd(Schema.FieldType fieldType) {
-    if (fieldType.getTypeName() == TypeName.INT64) {
-      return new BitAnd();
-    }
-    throw new UnsupportedOperationException(
-        String.format("[%s] is not supported in BIT_AND", fieldType));
-  }
-
   static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> {
     @Override
     public T apply(T left, T right) {
@@ -392,30 +383,4 @@ public class BeamBuiltinAggregations {
       return accum;
     }
   }
-
-  static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
-    @Override
-    public Long createAccumulator() {
-      return -1L;
-    }
-
-    @Override
-    public Long addInput(Long accum, T input) {
-      return accum & input.longValue();
-    }
-
-    @Override
-    public Long mergeAccumulators(Iterable<Long> accums) {
-      Long merged = createAccumulator();
-      for (long accum : accums) {
-        merged = merged & accum;
-      }
-      return merged;
-    }
-
-    @Override
-    public Long extractOutput(Long accum) {
-      return accum;
-    }
-  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 9c365b2..40b3b63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -314,39 +314,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     PCollection<Row> inputRows =
         pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
     PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
-    PAssert.that(result).containsInAnyOrder(rowResult);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testBitAndFunction() throws Exception {
-    pipeline.enableAbandonedNodeEnforcement(false);
-
-    Schema schemaInTableA =
-        Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
-
-    Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
-
-    List<Row> rowsInTableA =
-        TestUtils.RowsBuilder.of(schemaInTableA)
-            .addRows(
-                0xF001L, 0,
-                0x00A1L, 0)
-            .getRows();
-
-    String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2";
-
-    Row rowResult = Row.withSchema(resultType).addValues(1L).build();
-
-    PCollection<Row> inputRows =
-        pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
-    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
     PAssert.that(result).containsInAnyOrder(rowResult);
-
-    pipeline.run().waitUntilFinish();
   }
 
   private static class CheckerBigDecimalDivide
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index 1e1d507..0dae1d8 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -35,7 +35,6 @@ public class SqlStdOperatorMappingTable {
           FunctionSignatureId.FN_ANY_VALUE,
           FunctionSignatureId.FN_STRING_AGG_STRING,
           FunctionSignatureId.FN_BIT_OR_INT64,
-          FunctionSignatureId.FN_BIT_AND_INT64,
           FunctionSignatureId.FN_OR,
           FunctionSignatureId.FN_NOT,
           FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -239,7 +238,7 @@ public class SqlStdOperatorMappingTable {
           // .put("array_agg", )
           // .put("array_concat_agg")
           .put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported
-          .put("bit_and", SqlStdOperatorTable.BIT_AND)
+          // .put("bit_and")
           // .put("bit_xor")
           // .put("logical_and")
           // .put("logical_or")
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 0d787c6..e7f442d 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -4892,23 +4892,6 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase {
   }
 
   @Test
-  public void testZetaSQLBitAnd() {
-    String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    final Schema schema = Schema.builder().addInt64Field("field1").build();
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(schema).addValue(1L).build(),
-            Row.withSchema(schema).addValue(0L).build());
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
   public void testSimpleTableName() {
     String sql = "SELECT Key FROM KeyValue";