You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/06/24 20:50:31 UTC
[beam] branch master updated: [BEAM-9890] Support BIT_AND
aggregation function in Beam SQL and added tests
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d41dd50 [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests
new 18ce248 Merge pull request #12079 from Imfuyuwei/master
d41dd50 is described below
commit d41dd5018f90bcac90079f68624710c811f05e50
Author: Yuwei Fu <fu...@google.com>
AuthorDate: Wed Jun 24 06:18:45 2020 +0000
[BEAM-9890] Support BIT_AND aggregation function in Beam SQL and added tests
---
.../impl/transform/BeamBuiltinAggregations.java | 35 ++++++++++++++++++++++
.../extensions/sql/BeamSqlDslAggregationTest.java | 32 ++++++++++++++++++++
.../sql/zetasql/SqlStdOperatorMappingTable.java | 3 +-
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 17 +++++++++++
4 files changed, 86 insertions(+), 1 deletion(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index 347fdc12..ab3786b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -58,6 +58,7 @@ public class BeamBuiltinAggregations {
.put("$SUM0", BeamBuiltinAggregations::createSum)
.put("AVG", BeamBuiltinAggregations::createAvg)
.put("BIT_OR", BeamBuiltinAggregations::createBitOr)
+ .put("BIT_AND", BeamBuiltinAggregations::createBitAnd)
.put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName()))
.put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName()))
.put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName()))
@@ -185,6 +186,14 @@ public class BeamBuiltinAggregations {
String.format("[%s] is not supported in BIT_OR", fieldType));
}
+ static CombineFn createBitAnd(Schema.FieldType fieldType) {
+ if (fieldType.getTypeName() == TypeName.INT64) {
+ return new BitAnd();
+ }
+ throw new UnsupportedOperationException(
+ String.format("[%s] is not supported in BIT_AND", fieldType));
+ }
+
static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> {
@Override
public T apply(T left, T right) {
@@ -383,4 +392,30 @@ public class BeamBuiltinAggregations {
return accum;
}
}
+
+ static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> {
+ @Override
+ public Long createAccumulator() {
+ return -1L;
+ }
+
+ @Override
+ public Long addInput(Long accum, T input) {
+ return accum & input.longValue();
+ }
+
+ @Override
+ public Long mergeAccumulators(Iterable<Long> accums) {
+ Long merged = createAccumulator();
+ for (long accum : accums) {
+ merged = merged & accum;
+ }
+ return merged;
+ }
+
+ @Override
+ public Long extractOutput(Long accum) {
+ return accum;
+ }
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 40b3b63..9c365b2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -314,7 +314,39 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
PCollection<Row> inputRows =
pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+ PAssert.that(result).containsInAnyOrder(rowResult);
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testBitAndFunction() throws Exception {
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ Schema schemaInTableA =
+ Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build();
+
+ Schema resultType = Schema.builder().addInt64Field("finalAnswer").build();
+
+ List<Row> rowsInTableA =
+ TestUtils.RowsBuilder.of(schemaInTableA)
+ .addRows(
+ 0xF001L, 0,
+ 0x00A1L, 0)
+ .getRows();
+
+ String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2";
+
+ Row rowResult = Row.withSchema(resultType).addValues(1L).build();
+
+ PCollection<Row> inputRows =
+ pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA));
+ PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
PAssert.that(result).containsInAnyOrder(rowResult);
+
+ pipeline.run().waitUntilFinish();
}
private static class CheckerBigDecimalDivide
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index b89e053..4675f62 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -35,6 +35,7 @@ public class SqlStdOperatorMappingTable {
FunctionSignatureId.FN_ANY_VALUE,
FunctionSignatureId.FN_STRING_AGG_STRING,
FunctionSignatureId.FN_BIT_OR_INT64,
+ FunctionSignatureId.FN_BIT_AND_INT64,
FunctionSignatureId.FN_OR,
FunctionSignatureId.FN_NOT,
FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -226,7 +227,7 @@ public class SqlStdOperatorMappingTable {
// .put("array_agg", )
// .put("array_concat_agg")
.put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported
- // .put("bit_and")
+ .put("bit_and", SqlStdOperatorTable.BIT_AND)
// .put("bit_xor")
// .put("logical_and")
// .put("logical_or")
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 82750eb..84f3eb9 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -4613,6 +4613,23 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase {
}
@Test
+ public void testZetaSQLBitAnd() {
+ String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ final Schema schema = Schema.builder().addInt64Field("field1").build();
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(schema).addValue(1L).build(),
+ Row.withSchema(schema).addValue(0L).build());
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testSimpleTableName() {
String sql = "SELECT Key FROM KeyValue";