You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2018/10/03 22:14:31 UTC

[beam] branch master updated: [BEAM-5427] Fix and update sample code for CombineFn. (#6439)

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b34b180  [BEAM-5427] Fix and update sample code for CombineFn. (#6439)
b34b180 is described below

commit b34b18079efb699718bb2503757a6da8ef1f2433
Author: Ruoyun Huang <hu...@gmail.com>
AuthorDate: Wed Oct 3 15:14:23 2018 -0700

    [BEAM-5427] Fix and update sample code for CombineFn. (#6439)
---
 .../org/apache/beam/sdk/transforms/Combine.java    | 36 +++++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2c65f94..20314fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -254,10 +254,26 @@ public class Combine {
    *
    * <pre>{@code
    * public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
-   *   public static class Accum {
+   *   public static class Accum implements Serializable {
    *     int sum = 0;
    *     int count = 0;
+   *
+   *    {@literal@}Override
+   *     public boolean equals(Object other) {
+   *       if (other == null) return false;
+   *       if (other == this) return true;
+   *       if (!(other instanceof Accum))return false;
+   *
+   *
+   *       Accum o = (Accum)other;
+   *       if (this.sum != o.sum || this.count != o.count) {
+   *         return false;
+   *       } else {
+   *         return true;
+   *       }
+   *     }
    *   }
+   *
    *   public Accum createAccumulator() {
    *     return new Accum();
    *   }
@@ -289,6 +305,24 @@ public class Combine {
    * arbitrary tree structure. Commutativity is required because any order of the input values is
    * ignored when breaking up input values into groups.
    *
+   * <h3>Note on Data Encoding</h3>
+   *
+   * <p>Some form of data encoding is required when using custom types in a CombineFn which do not
+   * have well-known coders. The sample code above uses a custom Accumulator which gets coder by
+   * implementing {@link java.io.Serializable}. By doing this, we are relying on the generic {@link
+   * org.apache.beam.sdk.coders.CoderProvider}, which is able to provide a coder for any {@link
+   * java.io.Serializable} if applicable. In cases where {@link java.io.Serializable} is not
+   * efficient, or inapplicable, in general there are two alternatives for encoding:
+   *
+   * <ul>
+   *   <li>Default {@link org.apache.beam.sdk.coders.CoderRegistry}. For example, implement a coder
+   *       class explicitly and use the {@code @DefaultCoder} tag. See the {@link
+   *       org.apache.beam.sdk.coders.CoderRegistry} for the numerous ways in which to bind a type
+   *       to a coder.
+   *   <li>CombineFn specific way. While extending CombineFn, overwrite both {@link
+   *       #getAccumulatorCoder} and {@link #getDefaultOutputCoder}.
+   * </ul>
+   *
    * @param <InputT> type of input values
    * @param <AccumT> type of mutable accumulator values
    * @param <OutputT> type of output values