You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/27 22:08:25 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15174: [BEAM-12100][BEAM-10379][BEAM-9514][BEAM-12647][BEAM-12099] AssertionError type mismatch from AggregateScanConverter

ibzib commented on a change in pull request #15174:
URL: https://github.com/apache/beam/pull/15174#discussion_r697732974



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -216,6 +216,8 @@ public RelWriter explainTerms(RelWriter pw) {
     private WindowFn<Row, IntervalWindow> windowFn;
     private int windowFieldIndex;
     private List<FieldAggregation> fieldAggregations;
+    private int groupSetCount;

Review comment:
       Should these be `final`?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -243,9 +247,70 @@ private Transform(
       if (windowFn != null) {
         windowedStream = assignTimestampsAndWindow(upstream);
       }
-
       validateWindowIsSupported(windowedStream);
+      return createCombiner(pinput, windowedStream);
+    }
+
+    private PCollection<Row> createCombiner(
+        PCollectionList<Row> pinput, PCollection<Row> windowedStream) {
+      // Check if have fields to be grouped
+      if (groupSetCount > 0) {
+        PTransform<PCollection<Row>, PCollection<Row>> combiner = createGroupCombiner();
+        boolean verifyRowValues =
+            pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
+        return windowedStream
+            .apply(combiner)
+            .apply(
+                "mergeRecord",
+                ParDo.of(
+                    mergeRecord(outputSchema, windowFieldIndex, ignoreValues, verifyRowValues)))
+            .setRowSchema(outputSchema);
+      }
+      PTransform<PCollection<Row>, PCollection<Row>> combiner = createGlobalCombiner();
+      return windowedStream.apply(combiner).setRowSchema(outputSchema);
+    }
 
+    private PTransform<PCollection<Row>, PCollection<Row>> createGlobalCombiner() {

Review comment:
       `createGroupCombiner` and `createGlobalCombiner` are mostly the same. Can we find a way to reduce code duplication?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -243,9 +247,70 @@ private Transform(
       if (windowFn != null) {
         windowedStream = assignTimestampsAndWindow(upstream);
       }
-
       validateWindowIsSupported(windowedStream);
+      return createCombiner(pinput, windowedStream);
+    }
+
+    private PCollection<Row> createCombiner(
+        PCollectionList<Row> pinput, PCollection<Row> windowedStream) {
+      // Check if have fields to be grouped
+      if (groupSetCount > 0) {
+        PTransform<PCollection<Row>, PCollection<Row>> combiner = createGroupCombiner();
+        boolean verifyRowValues =
+            pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
+        return windowedStream
+            .apply(combiner)
+            .apply(
+                "mergeRecord",
+                ParDo.of(
+                    mergeRecord(outputSchema, windowFieldIndex, ignoreValues, verifyRowValues)))
+            .setRowSchema(outputSchema);
+      }
+      PTransform<PCollection<Row>, PCollection<Row>> combiner = createGlobalCombiner();
+      return windowedStream.apply(combiner).setRowSchema(outputSchema);
+    }
 
+    private PTransform<PCollection<Row>, PCollection<Row>> createGlobalCombiner() {
+      org.apache.beam.sdk.schemas.transforms.Group.Global<Row> globally =
+          org.apache.beam.sdk.schemas.transforms.Group.globally();
+      org.apache.beam.sdk.schemas.transforms.Group.CombineFieldsGlobally<Row> combined = null;
+      for (FieldAggregation fieldAggregation : fieldAggregations) {
+        List<Integer> inputs = fieldAggregation.inputs;
+        CombineFn combineFn = fieldAggregation.combineFn;
+        if (inputs.size() > 1 || inputs.isEmpty()) {

Review comment:
       Nit: this logic is simpler if we put the single field case first.
   
   ```
   if (inputs.size() == 1) {
   
   } else {
   
   }
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -229,27 +232,93 @@ public T apply(T left, T right) {
     public Short apply(Short left, Short right) {
       return (short) (left + right);
     }
+
+    @Override
+    public @Nullable Short identity() {
+      return 0;
+    }
   }
 
   static class ByteSum extends Combine.BinaryCombineFn<Byte> {
     @Override
     public Byte apply(Byte left, Byte right) {
       return (byte) (left + right);
     }
+
+    @Override
+    public @Nullable Byte identity() {
+      return 0;
+    }
   }
 
   static class FloatSum extends Combine.BinaryCombineFn<Float> {
     @Override
     public Float apply(Float left, Float right) {
       return left + right;
     }
+
+    @Override
+    public @Nullable Float identity() {
+      return 0F;
+    }
+  }
+
+  static class LongSum extends Combine.BinaryCombineFn<Long> {
+    @Override
+    public Long apply(Long left, Long right) {
+      return Math.addExact(left, right);
+    }
+
+    @Override
+    public @Nullable Long identity() {
+      return 0L;
+    }
   }
 
   static class BigDecimalSum extends Combine.BinaryCombineFn<BigDecimal> {
     @Override
     public BigDecimal apply(BigDecimal left, BigDecimal right) {
       return left.add(right);
     }
+
+    @Override
+    public @Nullable BigDecimal identity() {
+      return BigDecimal.ZERO;
+    }
+  }
+
+  static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, AccumT, OutputT> {

Review comment:
       Should this be private?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -56,12 +58,13 @@
       BUILTIN_AGGREGATOR_FACTORIES =
           ImmutableMap.<String, Function<Schema.FieldType, CombineFn<?, ?, ?>>>builder()
               .put("ANY_VALUE", typeName -> Sample.anyValueCombineFn())
-              .put("COUNT", typeName -> Count.combineFn())
-              .put("MAX", BeamBuiltinAggregations::createMax)
-              .put("MIN", BeamBuiltinAggregations::createMin)
-              .put("SUM", BeamBuiltinAggregations::createSum)
-              .put("$SUM0", BeamBuiltinAggregations::createSum)
-              .put("AVG", BeamBuiltinAggregations::createAvg)
+              // Drop null elements for these aggregations BEAM-10379

Review comment:
       It might be confusing to link to that jira here, since it's referring to functions that _don't_ want to drop null elements.
   ```suggestion
                 // Drop null elements for these aggregations.
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -376,29 +445,45 @@ public BigDecimal toBigDecimal(BigDecimal record) {
     }
   }
 
-  static class BitOr<T extends Number> extends CombineFn<T, Long, Long> {
+  static class BitOr<T extends Number> extends CombineFn<T, BitOr.Accum, Long> {
+    static class Accum implements Serializable {
+      /** True if no inputs have been seen yet. */
+      boolean isEmpty = true;
+      /** The bitwise-and of the inputs seen so far. */
+      long bitOr = 0L;
+    }
+
     @Override
-    public Long createAccumulator() {
-      return 0L;
+    public Accum createAccumulator() {
+      return new Accum();
     }
 
     @Override
-    public Long addInput(Long accum, T input) {
-      return accum | input.longValue();
+    public Accum addInput(Accum accum, T input) {
+      accum.bitOr |= input.longValue();

Review comment:
       What if `input` is null?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java
##########
@@ -124,6 +124,11 @@ private BigDecimal getCovariance(CovarianceAccumulator covariance) {
     BigDecimal adjustedCount =
         this.isSample ? covariance.count().subtract(BigDecimal.ONE) : covariance.count();
 
+    // Avoid ArithmeticException: Division is undefined when adjustedCount and covariance are 0

Review comment:
       Isn't division undefined whenever the denominator is 0, regardless of the numerator?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -229,27 +232,93 @@ public T apply(T left, T right) {
     public Short apply(Short left, Short right) {
       return (short) (left + right);
     }
+
+    @Override
+    public @Nullable Short identity() {
+      return 0;
+    }
   }
 
   static class ByteSum extends Combine.BinaryCombineFn<Byte> {
     @Override
     public Byte apply(Byte left, Byte right) {
       return (byte) (left + right);
     }
+
+    @Override
+    public @Nullable Byte identity() {
+      return 0;
+    }
   }
 
   static class FloatSum extends Combine.BinaryCombineFn<Float> {
     @Override
     public Float apply(Float left, Float right) {
       return left + right;
     }
+
+    @Override
+    public @Nullable Float identity() {
+      return 0F;
+    }
+  }
+
+  static class LongSum extends Combine.BinaryCombineFn<Long> {
+    @Override
+    public Long apply(Long left, Long right) {
+      return Math.addExact(left, right);
+    }
+
+    @Override
+    public @Nullable Long identity() {
+      return 0L;
+    }
   }
 
   static class BigDecimalSum extends Combine.BinaryCombineFn<BigDecimal> {
     @Override
     public BigDecimal apply(BigDecimal left, BigDecimal right) {
       return left.add(right);
     }
+
+    @Override
+    public @Nullable BigDecimal identity() {
+      return BigDecimal.ZERO;
+    }
+  }
+
+  static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, AccumT, OutputT> {
+    CombineFn<InputT, AccumT, OutputT> combineFn;

Review comment:
       Can this be `private final`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org