You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 02:25:39 UTC
[3/6] beam git commit: Remove KeyedCombineFn
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 5ffaef8..0be8517 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -43,12 +43,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -155,7 +152,7 @@ public class Combine {
*/
public static <K, V> PerKey<K, V, V> perKey(
SerializableFunction<Iterable<V>, V> fn) {
- return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+ return perKey(IterableCombineFn.of(fn), displayDataForFn(fn));
}
/**
@@ -176,32 +173,11 @@ public class Combine {
*/
public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return perKey(fn.<K>asKeyedFn(), displayDataForFn(fn));
- }
-
- /**
- * Returns a {@link PerKey Combine.PerKey} {@code PTransform} that
- * first groups its input {@code PCollection} of {@code KV}s by keys and
- * windows, then invokes the given function on each of the key/values-lists
- * pairs to produce a combined value, and then returns a
- * {@code PCollection} of {@code KV}s mapping each distinct key to
- * its combined value for each window.
- *
- * <p>Each output element is in the window by which its corresponding input
- * was grouped, and has the timestamp of the end of that window. The output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * as the input.
- *
- * <p>See {@link PerKey Combine.PerKey} for more information.
- */
- public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
return perKey(fn, displayDataForFn(fn));
}
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
}
@@ -211,7 +187,7 @@ public class Combine {
* in {@link GroupByKey}.
*/
private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
}
@@ -239,7 +215,7 @@ public class Combine {
*/
public static <K, V> GroupedValues<K, V, V> groupedValues(
SerializableFunction<Iterable<V>, V> fn) {
- return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+ return groupedValues(IterableCombineFn.of(fn), displayDataForFn(fn));
}
/**
@@ -265,37 +241,11 @@ public class Combine {
*/
public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
GlobalCombineFn<? super InputT, ?, OutputT> fn) {
- return groupedValues(fn.<K>asKeyedFn(), displayDataForFn(fn));
- }
-
- /**
- * Returns a {@link GroupedValues Combine.GroupedValues}
- * {@code PTransform} that takes a {@code PCollection} of
- * {@code KV}s where a key maps to an {@code Iterable} of values, e.g.,
- * the result of a {@code GroupByKey}, then uses the given
- * {@code KeyedCombineFn} to combine all the values associated with
- * each key. The combining function is provided the key. The types
- * of the input and output values can differ.
- *
- * <p>Each output element has the same timestamp and is in the same window
- * as its corresponding input element, and the output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * <p>See {@link GroupedValues Combine.GroupedValues} for more information.
- *
- * <p>Note that {@link #perKey(CombineFnBase.PerKeyCombineFn)} is typically
- * more convenient to use than {@link GroupByKey} followed by
- * {@code groupedValues(...)}.
- */
- public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
return groupedValues(fn, displayDataForFn(fn));
}
private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
return new GroupedValues<>(fn, fnDisplayData);
}
@@ -471,81 +421,8 @@ public class Combine {
public TypeDescriptor<OutputT> getOutputType() {
return new TypeDescriptor<OutputT>(getClass()) {};
}
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
- // The key, an object, is never even looked at.
- return new KeyIgnoringCombineFn<>(this);
- }
-
- private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
- extends KeyedCombineFn<K, InputT, AccumT, OutputT>
- implements NameOverride {
-
- private final CombineFn<InputT, AccumT, OutputT> fn;
-
- private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
- this.fn = fn;
- }
-
- @Override
- public AccumT createAccumulator(K key) {
- return fn.createAccumulator();
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input) {
- return fn.addInput(accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return fn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return fn.extractOutput(accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator) {
- return fn.compact(accumulator);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return fn.getAccumulatorCoder(registry, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return fn.getDefaultOutputCoder(registry, inputCoder);
- }
-
- @Override
- public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return fn;
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- builder.delegate(fn);
- }
-
- @Override
- public String getNameOverride() {
- return NameUtils.approximateSimpleName(fn);
- }
- }
}
-
/////////////////////////////////////////////////////////////////////////////
/**
@@ -621,7 +498,6 @@ public class Combine {
public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCoder) {
return inputCoder;
}
-
}
/**
@@ -1083,215 +959,6 @@ public class Combine {
/////////////////////////////////////////////////////////////////////////////
-
- /**
- * A {@code KeyedCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
- * a collection of input values of type {@code InputT}, associated with
- * a key of type {@code K}, into a single output value of type
- * {@code OutputT}. It does this via one or more intermediate mutable
- * accumulator values of type {@code AccumT}.
- *
- * <p>The overall process to combine a collection of input
- * {@code InputT} values associated with an input {@code K} key into a
- * single output {@code OutputT} value is as follows:
- *
- * <ol>
- *
- * <li> The input {@code InputT} values are partitioned into one or more
- * batches.
- *
- * <li> For each batch, the {@link #createAccumulator} operation is
- * invoked to create a fresh mutable accumulator value of type
- * {@code AccumT}, initialized to represent the combination of zero
- * values.
- *
- * <li> For each input {@code InputT} value in a batch, the
- * {@link #addInput} operation is invoked to add the value to that
- * batch's accumulator {@code AccumT} value. The accumulator may just
- * record the new value (e.g., if {@code AccumT == List<InputT>}, or may do
- * work to represent the combination more compactly.
- *
- * <li> The {@link #mergeAccumulators} operation is invoked to
- * combine a collection of accumulator {@code AccumT} values into a
- * single combined output accumulator {@code AccumT} value, once the
- * merging accumulators have had all all the input values in their
- * batches added to them. This operation is invoked repeatedly,
- * until there is only one accumulator value left.
- *
- * <li> The {@link #extractOutput} operation is invoked on the final
- * accumulator {@code AccumT} value to get the output {@code OutputT} value.
- *
- * </ol>
- *
- * <p>All of these operations are passed the {@code K} key that the
- * values being combined are associated with.
- *
- * <p>For example:
- * <pre> {@code
- * public class ConcatFn
- * extends KeyedCombineFn<String, Integer, ConcatFn.Accum, String> {
- * public static class Accum {
- * String s = "";
- * }
- * public Accum createAccumulator(String key) {
- * return new Accum();
- * }
- * public Accum addInput(String key, Accum accum, Integer input) {
- * accum.s += "+" + input;
- * return accum;
- * }
- * public Accum mergeAccumulators(String key, Iterable<Accum> accums) {
- * Accum merged = new Accum();
- * for (Accum accum : accums) {
- * merged.s += accum.s;
- * }
- * return merged;
- * }
- * public String extractOutput(String key, Accum accum) {
- * return key + accum.s;
- * }
- * }
- * PCollection<KV<String, Integer>> pc = ...;
- * PCollection<KV<String, String>> pc2 = pc.apply(
- * Combine.perKey(new ConcatFn()));
- * } </pre>
- *
- * <p>Keyed combining functions used by {@link Combine.PerKey},
- * {@link Combine.GroupedValues}, and {@code PTransforms} derived
- * from them should be <i>associative</i> and <i>commutative</i>.
- * Associativity is required because input values are first broken
- * up into subgroups before being combined, and their intermediate
- * results further combined, in an arbitrary tree structure.
- * Commutativity is required because any order of the input values
- * is ignored when breaking up input values into groups.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public abstract static class KeyedCombineFn<K, InputT, AccumT, OutputT>
- extends AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> {
- /**
- * Returns a new, mutable accumulator value representing the accumulation of zero input values.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract AccumT createAccumulator(K key);
-
- /**
- * Adds the given input value to the given accumulator, returning the new accumulator value.
- *
- * <p>For efficiency, the input accumulator may be modified and returned.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract AccumT addInput(K key, AccumT accumulator, InputT value);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>May modify any of the argument accumulators. May return a
- * fresh accumulator, or may return one of the (modified) argument
- * accumulators.
- *
- * @param key the key that all the accumulators are associated
- * with
- */
- public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * @param key the key that all the accumulated values using the
- * accumulator are associated with
- */
- public abstract OutputT extractOutput(K key, AccumT accumulator);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>For most CombineFns this would be a no-op, but should be overridden
- * by CombineFns that (for example) buffer up elements and combine
- * them in batches.
- *
- * <p>For efficiency, the input accumulator may be modified and returned.
- *
- * <p>By default returns the original accumulator.
- */
- public AccumT compact(K key, AccumT accumulator) {
- return accumulator;
- }
-
- @Override
- public CombineFn<InputT, AccumT, OutputT> forKey(final K key, final Coder<K> keyCoder) {
- return new CombineFn<InputT, AccumT, OutputT>() {
-
- @Override
- public AccumT createAccumulator() {
- return KeyedCombineFn.this.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(AccumT accumulator, InputT input) {
- return KeyedCombineFn.this.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return KeyedCombineFn.this.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(AccumT accumulator) {
- return KeyedCombineFn.this.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(AccumT accumulator) {
- return KeyedCombineFn.this.compact(key, accumulator);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return KeyedCombineFn.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(KeyedCombineFn.this);
- }
- };
- }
-
- /**
- * Applies this {@code KeyedCombineFn} to a key and a collection
- * of input values to produce a combined output value.
- *
- * <p>Useful when testing the behavior of a {@code KeyedCombineFn}
- * separately from a {@code Combine} transform.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs) {
- AccumT accum = createAccumulator(key);
- for (InputT input : inputs) {
- accum = addInput(key, accum, input);
- }
- return extractOutput(key, accum);
- }
- }
-
////////////////////////////////////////////////////////////////////////////
/**
@@ -1458,8 +1125,7 @@ public class Combine {
.apply(WithKeys.<Void, InputT>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
- Combine.PerKey<Void, InputT, OutputT> combine =
- Combine.fewKeys(fn.asKeyedFn(), fnDisplayData);
+ Combine.PerKey<Void, InputT, OutputT> combine = Combine.fewKeys(fn, fnDisplayData);
if (!sideInputs.isEmpty()) {
combine = combine.withSideInputs(sideInputs);
}
@@ -1788,13 +1454,13 @@ public class Combine {
public static class PerKey<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private PerKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
this.fn = fn;
this.fnDisplayData = fnDisplayData;
@@ -1803,7 +1469,7 @@ public class Combine {
}
private PerKey(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
boolean fewKeys, List<PCollectionView<?>> sideInputs) {
this.fn = fn;
@@ -1819,7 +1485,7 @@ public class Combine {
/**
* Returns a {@link PTransform} identical to this, but with the specified side inputs to use
- * in {@link KeyedCombineFnWithContext}.
+ * in {@link CombineFnWithContext}.
*/
public PerKey<K, InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) {
return withSideInputs(Arrays.asList(sideInputs));
@@ -1827,7 +1493,7 @@ public class Combine {
/**
* Returns a {@link PTransform} identical to this, but with the specified side inputs to use
- * in {@link KeyedCombineFnWithContext}.
+ * in {@link CombineFnWithContext}.
*/
public PerKey<K, InputT, OutputT> withSideInputs(
Iterable<? extends PCollectionView<?>> sideInputs) {
@@ -1874,9 +1540,9 @@ public class Combine {
}
/**
- * Returns the {@link PerKeyCombineFn} used by this Combine operation.
+ * Returns the {@link GlobalCombineFn} used by this Combine operation.
*/
- public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+ public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
return fn;
}
@@ -1924,12 +1590,12 @@ public class Combine {
public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final SerializableFunction<? super K, Integer> hotKeyFanout;
private PerKeyWithHotKeyFanout(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
SerializableFunction<? super K, Integer> hotKeyFanout) {
this.fn = fn;
@@ -1951,8 +1617,8 @@ public class Combine {
// Name the accumulator type.
@SuppressWarnings("unchecked")
- final PerKeyCombineFn<K, InputT, AccumT, OutputT> typedFn =
- (PerKeyCombineFn<K, InputT, AccumT, OutputT>) this.fn;
+ final GlobalCombineFn<InputT, AccumT, OutputT> typedFn =
+ (GlobalCombineFn<InputT, AccumT, OutputT>) this.fn;
if (!(input.getCoder() instanceof KvCoder)) {
throw new IllegalStateException(
@@ -1966,7 +1632,7 @@ public class Combine {
try {
accumCoder = typedFn.getAccumulatorCoder(
input.getPipeline().getCoderRegistry(),
- inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ inputCoder.getValueCoder());
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Unable to determine accumulator coder.", e);
}
@@ -1979,38 +1645,37 @@ public class Combine {
// set of values, then drop the nonce and do a final combine of the
// aggregates. We do this by splitting the original CombineFn into two,
// on that does addInput + merge and another that does merge + extract.
- PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine;
- PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
- if (typedFn instanceof KeyedCombineFn) {
- final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn =
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn;
+ GlobalCombineFn<InputT, AccumT, AccumT> hotPreCombine;
+ GlobalCombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
+ if (typedFn instanceof CombineFn) {
+ final CombineFn<InputT, AccumT, OutputT> fn =
+ (CombineFn<InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
- new KeyedCombineFn<KV<K, Integer>, InputT, AccumT, AccumT>() {
+ new CombineFn<InputT, AccumT, AccumT>() {
@Override
- public AccumT createAccumulator(KV<K, Integer> key) {
- return keyedFn.createAccumulator(key.getKey());
+ public AccumT createAccumulator() {
+ return fn.createAccumulator();
}
@Override
- public AccumT addInput(KV<K, Integer> key, AccumT accumulator, InputT value) {
- return keyedFn.addInput(key.getKey(), accumulator, value);
+ public AccumT addInput(AccumT accumulator, InputT value) {
+ return fn.addInput(accumulator, value);
}
@Override
- public AccumT mergeAccumulators(
- KV<K, Integer> key, Iterable<AccumT> accumulators) {
- return keyedFn.mergeAccumulators(key.getKey(), accumulators);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return fn.mergeAccumulators(accumulators);
}
@Override
- public AccumT compact(KV<K, Integer> key, AccumT accumulator) {
- return keyedFn.compact(key.getKey(), accumulator);
+ public AccumT compact(AccumT accumulator) {
+ return fn.compact(accumulator);
}
@Override
- public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator) {
+ public AccumT extractOutput(AccumT accumulator) {
return accumulator;
}
@Override
@SuppressWarnings("unchecked")
public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+ CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
@@ -2020,142 +1685,147 @@ public class Combine {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
+
postCombine =
- new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+ new CombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@Override
- public AccumT createAccumulator(K key) {
- return keyedFn.createAccumulator(key);
+ public AccumT createAccumulator() {
+ return fn.createAccumulator();
}
+
@Override
- public AccumT addInput(
- K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
+ public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
if (value.accum == null) {
- return keyedFn.addInput(key, accumulator, value.input);
+ return fn.addInput(accumulator, value.input);
} else {
- return keyedFn.mergeAccumulators(key, ImmutableList.of(accumulator, value.accum));
+ return fn.mergeAccumulators(ImmutableList.of(accumulator, value.accum));
}
}
+
@Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
- return keyedFn.mergeAccumulators(key, accumulators);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return fn.mergeAccumulators(accumulators);
}
+
@Override
- public AccumT compact(K key, AccumT accumulator) {
- return keyedFn.compact(key, accumulator);
+ public AccumT compact(AccumT accumulator) {
+ return fn.compact(accumulator);
}
+
@Override
- public OutputT extractOutput(K key, AccumT accumulator) {
- return keyedFn.extractOutput(key, accumulator);
+ public OutputT extractOutput(AccumT accumulator) {
+ return fn.extractOutput(accumulator);
}
+
@Override
public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry,
- Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
throws CannotProvideCoderException {
- return keyedFn.getDefaultOutputCoder(
- registry, keyCoder, inputCoder.getValueCoder());
+ return fn.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
}
@Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> inputCoder)
- throws CannotProvideCoderException {
+ public Coder<AccumT> getAccumulatorCoder(
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+ throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
- } else if (typedFn instanceof KeyedCombineFnWithContext) {
- final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn;
+ } else if (typedFn instanceof CombineFnWithContext) {
+ final CombineFnWithContext<InputT, AccumT, OutputT> fnWithContext =
+ (CombineFnWithContext<InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
- new KeyedCombineFnWithContext<KV<K, Integer>, InputT, AccumT, AccumT>() {
+ new CombineFnWithContext<InputT, AccumT, AccumT>() {
@Override
- public AccumT createAccumulator(KV<K, Integer> key, Context c) {
- return keyedFnWithContext.createAccumulator(key.getKey(), c);
+ public AccumT createAccumulator(Context c) {
+ return fnWithContext.createAccumulator(c);
}
@Override
- public AccumT addInput(
- KV<K, Integer> key, AccumT accumulator, InputT value, Context c) {
- return keyedFnWithContext.addInput(key.getKey(), accumulator, value, c);
+ public AccumT addInput(AccumT accumulator, InputT value, Context c) {
+ return fnWithContext.addInput(accumulator, value, c);
}
@Override
- public AccumT mergeAccumulators(
- KV<K, Integer> key, Iterable<AccumT> accumulators, Context c) {
- return keyedFnWithContext.mergeAccumulators(key.getKey(), accumulators, c);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+ return fnWithContext.mergeAccumulators(accumulators, c);
}
@Override
- public AccumT compact(KV<K, Integer> key, AccumT accumulator, Context c) {
- return keyedFnWithContext.compact(key.getKey(), accumulator, c);
+ public AccumT compact(AccumT accumulator, Context c) {
+ return fnWithContext.compact(accumulator, c);
}
@Override
- public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator, Context c) {
+ public AccumT extractOutput(AccumT accumulator, Context c) {
return accumulator;
}
@Override
@SuppressWarnings("unchecked")
public Coder<AccumT> getAccumulatorCoder(
- CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+ CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
postCombine =
- new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+ new CombineFnWithContext<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
@Override
- public AccumT createAccumulator(K key, Context c) {
- return keyedFnWithContext.createAccumulator(key, c);
+ public AccumT createAccumulator(Context c) {
+ return fnWithContext.createAccumulator(c);
}
+
@Override
public AccumT addInput(
- K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
+ AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
if (value.accum == null) {
- return keyedFnWithContext.addInput(key, accumulator, value.input, c);
+ return fnWithContext.addInput(accumulator, value.input, c);
} else {
- return keyedFnWithContext.mergeAccumulators(
- key, ImmutableList.of(accumulator, value.accum), c);
+ return fnWithContext.mergeAccumulators(
+ ImmutableList.of(accumulator, value.accum), c);
}
}
+
@Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return keyedFnWithContext.mergeAccumulators(key, accumulators, c);
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+ return fnWithContext.mergeAccumulators(accumulators, c);
}
+
@Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return keyedFnWithContext.compact(key, accumulator, c);
+ public AccumT compact(AccumT accumulator, Context c) {
+ return fnWithContext.compact(accumulator, c);
}
+
@Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return keyedFnWithContext.extractOutput(key, accumulator, c);
+ public OutputT extractOutput(AccumT accumulator, Context c) {
+ return fnWithContext.extractOutput(accumulator, c);
}
+
@Override
public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry,
- Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
throws CannotProvideCoderException {
- return keyedFnWithContext.getDefaultOutputCoder(
- registry, keyCoder, inputCoder.getValueCoder());
+ return fnWithContext.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
}
@Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+ public Coder<AccumT> getAccumulatorCoder(
+ CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
throws CannotProvideCoderException {
return accumCoder;
}
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(PerKeyWithHotKeyFanout.this);
@@ -2202,25 +1872,33 @@ public class Combine {
}
// Combine the hot and cold keys separately.
- PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot = split
- .get(hot)
- .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
- inputCoder.getValueCoder()))
- .setWindowingStrategyInternal(preCombineStrategy)
- .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
- .apply("StripNonce", MapElements.via(
- new SimpleFunction<KV<KV<K, Integer>, AccumT>,
- KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
- public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) {
- return KV.of(
- elem.getKey().getKey(),
- InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
- }
- }))
- .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
- .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
- .setWindowingStrategyInternal(input.getWindowingStrategy());
+ PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
+ split
+ .get(hot)
+ .setCoder(
+ KvCoder.of(
+ KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
+ inputCoder.getValueCoder()))
+ .setWindowingStrategyInternal(preCombineStrategy)
+ .apply(
+ "PreCombineHot",
+ Combine.<KV<K, Integer>, InputT, AccumT>perKey(hotPreCombine, fnDisplayData))
+ .apply(
+ "StripNonce",
+ MapElements.via(
+ new SimpleFunction<
+ KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @Override
+ public KV<K, InputOrAccum<InputT, AccumT>> apply(
+ KV<KV<K, Integer>, AccumT> elem) {
+ return KV.of(
+ elem.getKey().getKey(),
+ InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
+ }
+ }))
+ .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
+ .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
.get(cold)
.setCoder(inputCoder)
@@ -2235,9 +1913,12 @@ public class Combine {
.setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));
// Combine the union of the pre-processed hot and cold key results.
- return PCollectionList.of(precombinedHot).and(preprocessedCold)
+ return PCollectionList.of(precombinedHot)
+ .and(preprocessedCold)
.apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
- .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
+ .apply(
+ "PostCombine",
+ Combine.<K, InputOrAccum<InputT, AccumT>, OutputT>perKey(postCombine, fnDisplayData));
}
@Override
@@ -2325,71 +2006,61 @@ public class Combine {
/////////////////////////////////////////////////////////////////////////////
/**
- * {@code GroupedValues<K, InputT, OutputT>} takes a
- * {@code PCollection<KV<K, Iterable<InputT>>>}, such as the result of
- * {@link GroupByKey}, applies a specified
- * {@link KeyedCombineFn KeyedCombineFn<K, InputT, AccumT, OutputT>}
- * to each of the input {@code KV<K, Iterable<InputT>>} elements to
- * produce a combined output {@code KV<K, OutputT>} element, and returns a
- * {@code PCollection<KV<K, OutputT>>} containing all the combined output
- * elements. It is common for {@code InputT == OutputT}, but not required.
- * Common combining functions include sums, mins, maxes, and averages
- * of numbers, conjunctions and disjunctions of booleans, statistical
- * aggregations, etc.
+ * {@code GroupedValues<K, InputT, OutputT>} takes a {@code PCollection<KV<K, Iterable<InputT>>>},
+ * such as the result of {@link GroupByKey}, applies a specified {@link CombineFn
+ * CombineFn<InputT, AccumT, OutputT>} to each of the input {@code KV<K,
+ * Iterable<InputT>>} elements to produce a combined output {@code KV<K, OutputT>} element, and
+ * returns a {@code PCollection<KV<K, OutputT>>} containing all the combined output elements. It
+ * is common for {@code InputT == OutputT}, but not required. Common combining functions include
+ * sums, mins, maxes, and averages of numbers, conjunctions and disjunctions of booleans,
+ * statistical aggregations, etc.
*
* <p>Example of use:
- * <pre> {@code
+ *
+ * <pre>{@code
* PCollection<KV<String, Integer>> pc = ...;
* PCollection<KV<String, Iterable<Integer>>> groupedByKey = pc.apply(
* new GroupByKey<String, Integer>());
* PCollection<KV<String, Integer>> sumByKey = groupedByKey.apply(
* Combine.<String, Integer>groupedValues(
* new Sum.SumIntegerFn()));
- * } </pre>
+ * }
+ * </pre>
*
- * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which
- * captures the common pattern of "combining by key" in a
- * single easy-to-use {@code PTransform}.
+ * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which captures the common pattern of
+ * "combining by key" in a single easy-to-use {@code PTransform}.
*
- * <p>Combining for different keys can happen in parallel. Moreover,
- * combining of the {@code Iterable<InputT>} values associated a single
- * key can happen in parallel, with different subsets of the values
- * being combined separately, and their intermediate results combined
- * further, in an arbitrary tree reduction pattern, until a single
- * result value is produced for each key.
+ * <p>Combining for different keys can happen in parallel. Moreover, combining of the {@code
+ * Iterable<InputT>} values associated a single key can happen in parallel, with different subsets
+ * of the values being combined separately, and their intermediate results combined further, in an
+ * arbitrary tree reduction pattern, until a single result value is produced for each key.
*
- * <p>By default, the {@code Coder} of the keys of the output
- * {@code PCollection<KV<K, OutputT>>} is that of the keys of the input
- * {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of the values
- * of the output {@code PCollection<KV<K, OutputT>>} is inferred from the
- * concrete type of the {@code KeyedCombineFn<K, InputT, AccumT, OutputT>}'s output
- * type {@code OutputT}.
+ * <p>By default, the {@code Coder} of the keys of the output {@code PCollection<KV<K, OutputT>>}
+ * is that of the keys of the input {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of
+ * the values of the output {@code PCollection<KV<K, OutputT>>} is inferred from the concrete type
+ * of the {@code CombineFn<InputT, AccumT, OutputT>}'s output type {@code OutputT}.
*
- * <p>Each output element has the same timestamp and is in the same window
- * as its corresponding input element, and the output
- * {@code PCollection} has the same
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
+ * <p>Each output element has the same timestamp and is in the same window as its corresponding
+ * input element, and the output {@code PCollection} has the same {@link
+ * org.apache.beam.sdk.transforms.windowing.WindowFn} associated with it as the input.
*
- * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which
- * combines all the values in a {@code PCollection} into a
- * single value in a {@code PCollection}.
+ * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which combines all the values
+ * in a {@code PCollection} into a single value in a {@code PCollection}.
*
* @param <K> type of input and output keys
* @param <InputT> type of input values
* @param <OutputT> type of output values
*/
public static class GroupedValues<K, InputT, OutputT>
- extends PTransform
- <PCollection<? extends KV<K, ? extends Iterable<InputT>>>,
- PCollection<KV<K, OutputT>>> {
+ extends PTransform<
+ PCollection<? extends KV<K, ? extends Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
- private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+ private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final List<PCollectionView<?>> sideInputs;
private GroupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
this.fn = SerializableUtils.clone(fn);
this.fnDisplayData = fnDisplayData;
@@ -2397,7 +2068,7 @@ public class Combine {
}
private GroupedValues(
- PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+ GlobalCombineFn<? super InputT, ?, OutputT> fn,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
List<PCollectionView<?>> sideInputs) {
this.fn = SerializableUtils.clone(fn);
@@ -2415,9 +2086,9 @@ public class Combine {
}
/**
- * Returns the KeyedCombineFn used by this Combine operation.
+ * Returns the {@link GlobalCombineFn} used by this Combine operation.
*/
- public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+ public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
return fn;
}
@@ -2436,9 +2107,9 @@ public class Combine {
K key = c.element().getKey();
OutputT output;
- if (fn instanceof KeyedCombineFnWithContext) {
- output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn)
- .apply(key, c.element().getValue(), new CombineWithContext.Context() {
+ if (fn instanceof CombineFnWithContext) {
+ output = ((CombineFnWithContext<? super InputT, ?, OutputT>) fn)
+ .apply(c.element().getValue(), new CombineWithContext.Context() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
@@ -2449,9 +2120,9 @@ public class Combine {
return c.sideInput(view);
}
});
- } else if (fn instanceof KeyedCombineFn) {
- output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn)
- .apply(key, c.element().getValue());
+ } else if (fn instanceof CombineFn) {
+ output = ((CombineFn<? super InputT, ?, OutputT>) fn)
+ .apply(c.element().getValue());
} else {
throw new IllegalStateException(
String.format("Unknown type of CombineFn: %s", fn.getClass()));
@@ -2516,10 +2187,9 @@ public class Combine {
KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
@SuppressWarnings("unchecked")
Coder<OutputT> outputValueCoder =
- ((PerKeyCombineFn<K, InputT, ?, OutputT>) fn)
- .getDefaultOutputCoder(
- input.getPipeline().getCoderRegistry(),
- kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+ ((GlobalCombineFn<InputT, ?, OutputT>) fn)
+ .getDefaultOutputCoder(
+ input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 770a390..a881099 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -25,9 +25,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -92,65 +90,6 @@ public class CombineFnBase {
*/
OutputT defaultValue();
- /**
- * Converts this {@code GloballyCombineFn} into an equivalent
- * {@link PerKeyCombineFn} that ignores the keys passed to it and
- * combines the values according to this {@code GloballyCombineFn}.
- *
- * @param <K> the type of the (ignored) keys
- */
- <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn();
- }
-
- /**
- * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
- * a collection of input values of type {@code InputT}, associated with
- * a key of type {@code K}, into a single output value of type
- * {@code OutputT}. It does this via one or more intermediate mutable
- * accumulator values of type {@code AccumT}.
- *
- * <p>Do not implement this interface directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public interface PerKeyCombineFn<K, InputT, AccumT, OutputT>
- extends Serializable, HasDisplayData {
- /**
- * Returns the {@code Coder} to use for accumulator {@code AccumT}
- * values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code AccumT} values.
- *
- * <p>This is the Coder used to send data through a communication-intensive
- * shuffle step, so a compact and efficient representation may have
- * significant performance benefits.
- */
- Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the {@code Coder} to use by default for output
- * {@code OutputT} values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code OutputT} values.
- */
- Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the a regular {@link GlobalCombineFn} that operates on a specific key.
- */
- GlobalCombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder);
}
/**
@@ -228,79 +167,4 @@ public class CombineFnBase {
public void populateDisplayData(DisplayData.Builder builder) {
}
}
-
- /**
- * An abstract {@link PerKeyCombineFn} base class shared by
- * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}.
- *
- * <p>Do not extends this class directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFn<K, InputT, AccumT, OutputT> {
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(
- getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder),
- getAccumTVariable());
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(),
- inputCoder, getAccumTVariable(),
- this.getAccumulatorCoder(registry, keyCoder, inputCoder)),
- getOutputTVariable());
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code K}.
- */
- public TypeVariable<?> getKTypeVariable() {
- return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code InputT}.
- */
- public TypeVariable<?> getInputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code AccumT}.
- */
- public TypeVariable<?> getAccumTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code OutputT}.
- */
- public TypeVariable<?> getOutputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index ca939c1..cc02dcf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -37,12 +37,9 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.CombineFnUtil;
@@ -54,49 +51,6 @@ import org.apache.beam.sdk.values.TupleTag;
public class CombineFns {
/**
- * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
- * {@link PerKeyCombineFn}.
- *
- * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
- *
- * <p>Example:
- * <pre>{@code
- * PCollection<KV<K, Integer>> latencies = ...;
- *
- * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
- * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
- *
- * SimpleFunction<Integer, Integer> identityFn =
- * new SimpleFunction<Integer, Integer>() {
- * {@literal @}Override
- * public Integer apply(Integer input) {
- * return input;
- * }};
- * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
- * Combine.perKey(
- * CombineFns.composeKeyed()
- * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
- * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
- *
- * PCollection<T> finalResultCollection = maxAndMean
- * .apply(ParDo.of(
- * new DoFn<KV<K, CoCombineResult>, T>() {
- * {@literal @}ProcessElement
- * public void processElement(ProcessContext c) throws Exception {
- * KV<K, CoCombineResult> e = c.element();
- * Integer maxLatency = e.getValue().get(maxLatencyTag);
- * Double meanLatency = e.getValue().get(meanLatencyTag);
- * .... Do Something ....
- * c.output(...some T...);
- * }
- * }));
- * }</pre>
- */
- public static ComposeKeyedCombineFnBuilder composeKeyed() {
- return new ComposeKeyedCombineFnBuilder();
- }
-
- /**
* Returns a {@link ComposeCombineFnBuilder} to construct a composed
* {@link GlobalCombineFn}.
*
@@ -142,67 +96,6 @@ public class CombineFns {
/////////////////////////////////////////////////////////////////////////////
/**
- * A builder class to construct a composed {@link PerKeyCombineFn}.
- */
- public static class ComposeKeyedCombineFnBuilder {
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFn<DataT, K>()
- .with(extractInputFn, keyedCombineFn, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFnWithContext<DataT, K>()
- .with(extractInputFn, keyedCombineFnWithContext, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag);
- }
- }
-
- /**
* A builder class to construct a composed {@link GlobalCombineFn}.
*/
public static class ComposeCombineFnBuilder {
@@ -246,7 +139,7 @@ public class CombineFns {
/**
* A tuple of outputs produced by a composed combine functions.
*
- * <p>See {@link #compose()} or {@link #composeKeyed()}) for details.
+ * <p>See {@link #compose()} for details.
*/
public static class CoCombineResult implements Serializable {
@@ -598,345 +491,6 @@ public class CombineFns {
}
}
- /**
- * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}.
- *
- * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFn<DataT, K>
- extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFn() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFn(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFn<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link KeyedCombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
- Lists.newArrayList();
- for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
- fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
- }
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(fnsWithContext)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link CombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i));
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i]));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- CombineFns.populateDisplayData(builder, keyedCombineFns);
- }
- }
-
- /**
- * A composed {@link KeyedCombineFnWithContext} that applies multiple
- * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}.
- *
- * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFnWithContext<DataT, K>
- extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFnWithContext() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFnWithContext(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns =
- (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link PerKeyCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(CombineFnUtil.toFnWithContext(perKeyCombineFn))
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link GlobalCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key, Context c) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key, c);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i), c);
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i], c));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(
- registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- CombineFns.populateDisplayData(builder, keyedCombineFns);
- }
- }
-
/////////////////////////////////////////////////////////////////////////////
private static class ProjectionIterable implements Iterable<Object> {
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index cd0600a..9ae19f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -17,20 +17,15 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionView;
/**
* This class contains combine functions that have access to {@code PipelineOptions} and side inputs
* through {@code CombineWithContext.Context}.
*
- * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend.
+ * <p>{@link CombineFnWithContext} is for users to extend.
*/
public class CombineWithContext {
@@ -116,170 +111,23 @@ public class CombineWithContext {
return accumulator;
}
- @Override
- public OutputT defaultValue() {
- throw new UnsupportedOperationException(
- "Override this function to provide the default value.");
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() {
- // The key, an object, is never even looked at.
- return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, Context c) {
- return CombineFnWithContext.this.createAccumulator(c);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) {
- return CombineFnWithContext.this.addInput(accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return CombineFnWithContext.this.mergeAccumulators(accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.extractOutput(accumulator, c);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.compact(accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder);
- }
-
- @Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return CombineFnWithContext.this;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(CombineFnWithContext.this);
- }
- };
- }
- }
-
- /**
- * A keyed combine function that has access to {@code PipelineOptions} and side inputs through
- * {@code CombineWithContext.Context}.
- *
- * <p>See the equivalent {@link KeyedCombineFn} for details about keyed combine functions.
- */
- public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
- extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements RequiresContextInternal {
- /**
- * Returns a new, mutable accumulator value representing the accumulation of zero input values.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator},
- * but it has additional access to {@code CombineWithContext.Context}.
- */
- public abstract AccumT createAccumulator(K key, Context c);
-
/**
- * Adds the given input value to the given accumulator, returning the new accumulator value.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to
- * {@code CombineWithContext.Context}.
+ * Applies this {@code CombineFnWithContext} to a collection of input values to produce a
+ * combined output value.
*/
- public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators},
- * but it has additional access to {@code CombineWithContext.Context}..
- */
- public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract OutputT extractOutput(K key, AccumT accumulator, Context c);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return accumulator;
- }
-
- /**
- * Applies this {@code KeyedCombineFnWithContext} to a key and a collection
- * of input values to produce a combined output value.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) {
- AccumT accum = createAccumulator(key, c);
+ public OutputT apply(Iterable<? extends InputT> inputs, Context c) {
+ AccumT accum = createAccumulator(c);
for (InputT input : inputs) {
- accum = addInput(key, accum, input, c);
+ accum = addInput(accum, input, c);
}
- return extractOutput(key, accum, c);
+ return extractOutput(accum, c);
}
@Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(
- final K key, final Coder<K> keyCoder) {
- return new CombineFnWithContext<InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(Context c) {
- return KeyedCombineFnWithContext.this.createAccumulator(key, c);
- }
-
- @Override
- public AccumT addInput(AccumT accumulator, InputT input, Context c) {
- return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
- return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(AccumT accumulator, Context c) {
- return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getDefaultOutputCoder(
- registry, keyCoder, inputCoder);
- }
- };
+ public OutputT defaultValue() {
+ throw new UnsupportedOperationException(
+ "Override this function to provide the default value.");
}
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 47be9b9..e42c0b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -234,7 +234,7 @@ public class Top {
public static <K, V, ComparatorT extends Comparator<V> & Serializable>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
perKey(int count, ComparatorT compareFn) {
- return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, compareFn));
}
/**
@@ -280,7 +280,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
smallestPerKey(int count) {
- return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()));
}
/**
@@ -326,7 +326,7 @@ public class Top {
public static <K, V extends Comparable<V>>
PerKey<K, V, List<V>>
largestPerKey(int count) {
- return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
+ return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 0495ad6..b3b8918 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -205,7 +205,7 @@ public class View {
* PCollection<KV<K, V>> input = ...
* CombineFn<V, OutputT> yourCombineFn = ...
* PCollectionView<Map<K, OutputT>> output = input
- * .apply(Combine.perKey(yourCombineFn.<K>asKeyedFn()))
+ * .apply(Combine.perKey(yourCombineFn))
* .apply(View.<K, OutputT>asMap());
* }</pre>
*