You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2018/10/03 22:14:31 UTC
[beam] branch master updated: [BEAM-5427] Fix and update sample
code for CombineFn. (#6439)
This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b34b180 [BEAM-5427] Fix and update sample code for CombineFn. (#6439)
b34b180 is described below
commit b34b18079efb699718bb2503757a6da8ef1f2433
Author: Ruoyun Huang <hu...@gmail.com>
AuthorDate: Wed Oct 3 15:14:23 2018 -0700
[BEAM-5427] Fix and update sample code for CombineFn. (#6439)
---
.../org/apache/beam/sdk/transforms/Combine.java | 36 +++++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2c65f94..20314fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -254,10 +254,26 @@ public class Combine {
*
* <pre>{@code
* public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
- * public static class Accum {
+ * public static class Accum implements Serializable {
* int sum = 0;
* int count = 0;
+ *
+ * {@literal@}Override
+ * public boolean equals(Object other) {
+ * if (other == null) return false;
+ * if (other == this) return true;
+ * if (!(other instanceof Accum))return false;
+ *
+ *
+ * Accum o = (Accum)other;
+ * if (this.sum != o.sum || this.count != o.count) {
+ * return false;
+ * } else {
+ * return true;
+ * }
+ * }
* }
+ *
* public Accum createAccumulator() {
* return new Accum();
* }
@@ -289,6 +305,24 @@ public class Combine {
* arbitrary tree structure. Commutativity is required because any order of the input values is
* ignored when breaking up input values into groups.
*
+ * <h3>Note on Data Encoding</h3>
+ *
+ * <p>Some form of data encoding is required when using custom types in a CombineFn which do not
+ * have well-known coders. The sample code above uses a custom Accumulator which gets coder by
+ * implementing {@link java.io.Serializable}. By doing this, we are relying on the generic {@link
+ * org.apache.beam.sdk.coders.CoderProvider}, which is able to provide a coder for any {@link
+ * java.io.Serializable} if applicable. In cases where {@link java.io.Serializable} is not
+ * efficient, or inapplicable, in general there are two alternatives for encoding:
+ *
+ * <ul>
+ * <li>Default {@link org.apache.beam.sdk.coders.CoderRegistry}. For example, implement a coder
+ * class explicitly and use the {@code @DefaultCoder} tag. See the {@link
+ * org.apache.beam.sdk.coders.CoderRegistry} for the numerous ways in which to bind a type
+ * to a coder.
+ * <li>CombineFn specific way. While extending CombineFn, overwrite both {@link
+ * #getAccumulatorCoder} and {@link #getDefaultOutputCoder}.
+ * </ul>
+ *
* @param <InputT> type of input values
* @param <AccumT> type of mutable accumulator values
* @param <OutputT> type of output values