You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/29 20:52:49 UTC
[beam] 01/01: Revert "[BEAM-9890] Support BIT_AND aggregation
function in Beam SQL"
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch revert-12079-master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit efe93afbc10084740b5066b93befd8481dc8dad0
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Jun 29 13:52:19 2020 -0700
Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL"
---
.../impl/transform/BeamBuiltinAggregations.java | 35 ----------------------
.../extensions/sql/BeamSqlDslAggregationTest.java | 32 --------------------
.../sql/zetasql/SqlStdOperatorMappingTable.java | 3 +-
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 17 -----------
4 files changed, 1 insertion(+), 86 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index ab3786b..347fdc12 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -58,7 +58,6 @@ public class BeamBuiltinAggregations {
.put("$SUM0", BeamBuiltinAggregations::createSum)
.put("AVG", BeamBuiltinAggregations::createAvg)
.put("BIT_OR", BeamBuiltinAggregations::createBitOr)
- .put("BIT_AND", BeamBuiltinAggregations::createBitAnd)
.put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName()))
.put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName()))
.put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName()))
@@ -186,14 +185,6 @@ public class BeamBuiltinAggregations {
String.format("[%s] is not supported in BIT_OR", fieldType));
}
- static CombineFn createBitAnd(Schema.FieldType fieldType) {
- if (fieldType.getTypeName() == TypeName.INT64) {
- return new BitAnd();
- }
- throw new UnsupportedOperationException(
- String.format("[%s] is not supported in BIT_AND", fieldType));
- }
-
static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> {
@Override
public T apply(T left, T right) {
@@ -392,30 +383,4 @@ public class BeamBuiltinAggregations {
return accum;
}
}
-
- static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
- @Override
- public Long createAccumulator() {
- return -1L;
- }
-
- @Override
- public Long addInput(Long accum, T input) {
- return accum & input.longValue();
- }
-
- @Override
- public Long mergeAccumulators(Iterable<Long> accums) {
- Long merged = createAccumulator();
- for (long accum : accums) {
- merged = merged & accum;
- }
- return merged;
- }
-
- @Override
- public Long extractOutput(Long accum) {
- return accum;
- }
- }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 9c365b2..40b3b63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -314,39 +314,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<Row> inputRows =
pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
- PAssert.that(result).containsInAnyOrder(rowResult);
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testBitAndFunction() throws Exception {
- pipeline.enableAbandonedNodeEnforcement(false);
-
- Schema schemaInTableA =
- Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
-
- Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
-
- List<Row> rowsInTableA =
- TestUtils.RowsBuilder.of(schemaInTableA)
- .addRows(
- 0xF001L, 0,
- 0x00A1L, 0)
- .getRows();
-
- String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2";
-
- Row rowResult = Row.withSchema(resultType).addValues(1L).build();
-
- PCollection<Row> inputRows =
- pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
- PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
-
PAssert.that(result).containsInAnyOrder(rowResult);
-
- pipeline.run().waitUntilFinish();
}
private static class CheckerBigDecimalDivide
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index 1e1d507..0dae1d8 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -35,7 +35,6 @@ public class SqlStdOperatorMappingTable {
FunctionSignatureId.FN_ANY_VALUE,
FunctionSignatureId.FN_STRING_AGG_STRING,
FunctionSignatureId.FN_BIT_OR_INT64,
- FunctionSignatureId.FN_BIT_AND_INT64,
FunctionSignatureId.FN_OR,
FunctionSignatureId.FN_NOT,
FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -239,7 +238,7 @@ public class SqlStdOperatorMappingTable {
// .put("array_agg", )
// .put("array_concat_agg")
.put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported
- .put("bit_and", SqlStdOperatorTable.BIT_AND)
+ // .put("bit_and")
// .put("bit_xor")
// .put("logical_and")
// .put("logical_or")
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 0d787c6..e7f442d 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -4892,23 +4892,6 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase {
}
@Test
- public void testZetaSQLBitAnd() {
- String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- final Schema schema = Schema.builder().addInt64Field("field1").build();
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(schema).addValue(1L).build(),
- Row.withSchema(schema).addValue(0L).build());
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
public void testSimpleTableName() {
String sql = "SELECT Key FROM KeyValue";