You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:48 UTC
[24/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
deleted file mode 100644
index a0b06cf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-
-/**
- * This class contains the shared interfaces and abstract classes for different types of combine
- * functions.
- *
- * <p>Users should not implement or extend them directly.
- */
-public class CombineFnBase {
- /**
- * A {@code GloballyCombineFn<InputT, AccumT, OutputT>} specifies how to combine a
- * collection of input values of type {@code InputT} into a single
- * output value of type {@code OutputT}. It does this via one or more
- * intermediate mutable accumulator values of type {@code AccumT}.
- *
- * <p>Do not implement this interface directly.
- * Extends {@link CombineFn} and {@link CombineFnWithContext} instead.
- *
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable {
-
- /**
- * Returns the {@code Coder} to use for accumulator {@code AccumT}
- * values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being used
- * for {@code InputT} values and the enclosing {@code Pipeline}'s
- * {@code CoderRegistry} to try to infer the Coder for {@code AccumT}
- * values.
- *
- * <p>This is the Coder used to send data through a communication-intensive
- * shuffle step, so a compact and efficient representation may have
- * significant performance benefits.
- */
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException;
-
- /**
- * Returns the {@code Coder} to use by default for output
- * {@code OutputT} values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for input {@code InputT} values and the enclosing
- * {@code Pipeline}'s {@code CoderRegistry} to try to infer the
- * Coder for {@code OutputT} values.
- */
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException;
-
- /**
- * Returns the error message for not supported default values in Combine.globally().
- */
- public String getIncompatibleGlobalWindowErrorMessage();
-
- /**
- * Returns the default value when there are no values added to the accumulator.
- */
- public OutputT defaultValue();
-
- /**
- * Converts this {@code GloballyCombineFn} into an equivalent
- * {@link PerKeyCombineFn} that ignores the keys passed to it and
- * combines the values according to this {@code GloballyCombineFn}.
- *
- * @param <K> the type of the (ignored) keys
- */
- public <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn();
- }
-
- /**
- * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
- * a collection of input values of type {@code InputT}, associated with
- * a key of type {@code K}, into a single output value of type
- * {@code OutputT}. It does this via one or more intermediate mutable
- * accumulator values of type {@code AccumT}.
- *
- * <p>Do not implement this interface directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- public interface PerKeyCombineFn<K, InputT, AccumT, OutputT> extends Serializable {
- /**
- * Returns the {@code Coder} to use for accumulator {@code AccumT}
- * values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code AccumT} values.
- *
- * <p>This is the Coder used to send data through a communication-intensive
- * shuffle step, so a compact and efficient representation may have
- * significant performance benefits.
- */
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the {@code Coder} to use by default for output
- * {@code OutputT} values, or null if it is not able to be inferred.
- *
- * <p>By default, uses the knowledge of the {@code Coder} being
- * used for {@code K} keys and input {@code InputT} values and the
- * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
- * infer the Coder for {@code OutputT} values.
- */
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
- /**
- * Returns the a regular {@link GlobalCombineFn} that operates on a specific key.
- */
- public abstract GlobalCombineFn<InputT, AccumT, OutputT> forKey(
- final K key, final Coder<K> keyCoder);
- }
-
- /**
- * An abstract {@link GlobalCombineFn} base class shared by
- * {@link CombineFn} and {@link CombineFnWithContext}.
- *
- * <p>Do not extend this class directly.
- * Extends {@link CombineFn} and {@link CombineFnWithContext} instead.
- *
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- abstract static class AbstractGlobalCombineFn<InputT, AccumT, OutputT>
- implements GlobalCombineFn<InputT, AccumT, OutputT>, Serializable {
- private static final String INCOMPATIBLE_GLOBAL_WINDOW_ERROR_MESSAGE =
- "Default values are not supported in Combine.globally() if the output "
- + "PCollection is not windowed by GlobalWindows. Instead, use "
- + "Combine.globally().withoutDefaults() to output an empty PCollection if the input "
- + "PCollection is empty, or Combine.globally().asSingletonView() to get the default "
- + "output of the CombineFn if the input PCollection is empty.";
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder), getAccumTVariable());
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder, getAccumTVariable(),
- this.getAccumulatorCoder(registry, inputCoder)),
- getOutputTVariable());
- }
-
- @Override
- public String getIncompatibleGlobalWindowErrorMessage() {
- return INCOMPATIBLE_GLOBAL_WINDOW_ERROR_MESSAGE;
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code InputT}.
- */
- public TypeVariable<?> getInputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<InputT>(AbstractGlobalCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code AccumT}.
- */
- public TypeVariable<?> getAccumTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<AccumT>(AbstractGlobalCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code OutputT}.
- */
- public TypeVariable<?> getOutputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<OutputT>(AbstractGlobalCombineFn.class) {}.getType();
- }
- }
-
- /**
- * An abstract {@link PerKeyCombineFn} base class shared by
- * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}.
- *
- * <p>Do not extends this class directly.
- * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
- *
- * @param <K> type of keys
- * @param <InputT> type of input values
- * @param <AccumT> type of mutable accumulator values
- * @param <OutputT> type of output values
- */
- abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFn<K, InputT, AccumT, OutputT> {
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(
- getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder),
- getAccumTVariable());
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
- ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(),
- inputCoder, getAccumTVariable(),
- this.getAccumulatorCoder(registry, keyCoder, inputCoder)),
- getOutputTVariable());
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code K}.
- */
- public TypeVariable<?> getKTypeVariable() {
- return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code InputT}.
- */
- public TypeVariable<?> getInputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code AccumT}.
- */
- public TypeVariable<?> getAccumTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
-
- /**
- * Returns the {@link TypeVariable} of {@code OutputT}.
- */
- public TypeVariable<?> getOutputTVariable() {
- return (TypeVariable<?>)
- new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
deleted file mode 100644
index 656c010..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Static utility methods that create combine function instances.
- */
-public class CombineFns {
-
- /**
- * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
- * {@link PerKeyCombineFn}.
- *
- * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
- *
- * <p>Example:
- * <pre>{ @code
- * PCollection<KV<K, Integer>> latencies = ...;
- *
- * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
- * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
- *
- * SimpleFunction<Integer, Integer> identityFn =
- * new SimpleFunction<Integer, Integer>() {
- * @Override
- * public Integer apply(Integer input) {
- * return input;
- * }};
- * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
- * Combine.perKey(
- * CombineFns.composeKeyed()
- * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
- * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
- *
- * PCollection<T> finalResultCollection = maxAndMean
- * .apply(ParDo.of(
- * new DoFn<KV<K, CoCombineResult>, T>() {
- * @Override
- * public void processElement(ProcessContext c) throws Exception {
- * KV<K, CoCombineResult> e = c.element();
- * Integer maxLatency = e.getValue().get(maxLatencyTag);
- * Double meanLatency = e.getValue().get(meanLatencyTag);
- * .... Do Something ....
- * c.output(...some T...);
- * }
- * }));
- * } </pre>
- */
- public static ComposeKeyedCombineFnBuilder composeKeyed() {
- return new ComposeKeyedCombineFnBuilder();
- }
-
- /**
- * Returns a {@link ComposeCombineFnBuilder} to construct a composed
- * {@link GlobalCombineFn}.
- *
- * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
- *
- * <p>Example:
- * <pre>{ @code
- * PCollection<Integer> globalLatencies = ...;
- *
- * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
- * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
- *
- * SimpleFunction<Integer, Integer> identityFn =
- * new SimpleFunction<Integer, Integer>() {
- * @Override
- * public Integer apply(Integer input) {
- * return input;
- * }};
- * PCollection<CoCombineResult> maxAndMean = globalLatencies.apply(
- * Combine.globally(
- * CombineFns.compose()
- * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
- * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
- *
- * PCollection<T> finalResultCollection = maxAndMean
- * .apply(ParDo.of(
- * new DoFn<CoCombineResult, T>() {
- * @Override
- * public void processElement(ProcessContext c) throws Exception {
- * CoCombineResult e = c.element();
- * Integer maxLatency = e.get(maxLatencyTag);
- * Double meanLatency = e.get(meanLatencyTag);
- * .... Do Something ....
- * c.output(...some T...);
- * }
- * }));
- * } </pre>
- */
- public static ComposeCombineFnBuilder compose() {
- return new ComposeCombineFnBuilder();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A builder class to construct a composed {@link PerKeyCombineFn}.
- */
- public static class ComposeKeyedCombineFnBuilder {
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFn<DataT, K>()
- .with(extractInputFn, keyedCombineFn, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return new ComposedKeyedCombineFnWithContext<DataT, K>()
- .with(extractInputFn, keyedCombineFnWithContext, outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
- * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
- */
- public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag);
- }
- }
-
- /**
- * A builder class to construct a composed {@link GlobalCombineFn}.
- */
- public static class ComposeCombineFnBuilder {
- /**
- * Returns a {@link ComposedCombineFn} that can take additional
- * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedCombineFn} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code combineFn},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <DataT, InputT, OutputT> ComposedCombineFn<DataT> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- return new ComposedCombineFn<DataT>()
- .with(extractInputFn, combineFn, outputTag);
- }
-
- /**
- * Returns a {@link ComposedCombineFnWithContext} that can take additional
- * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
- *
- * <p>The {@link ComposedCombineFnWithContext} extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them with the {@code combineFnWithContext},
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public <DataT, InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
- TupleTag<OutputT> outputTag) {
- return new ComposedCombineFnWithContext<DataT>()
- .with(extractInputFn, combineFnWithContext, outputTag);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A tuple of outputs produced by a composed combine functions.
- *
- * <p>See {@link #compose()} or {@link #composeKeyed()}) for details.
- */
- public static class CoCombineResult implements Serializable {
-
- private enum NullValue {
- INSTANCE;
- }
-
- private final Map<TupleTag<?>, Object> valuesMap;
-
- /**
- * The constructor of {@link CoCombineResult}.
- *
- * <p>Null values should have been filtered out from the {@code valuesMap}.
- * {@link TupleTag TupleTags} that associate with null values doesn't exist in the key set of
- * {@code valuesMap}.
- *
- * @throws NullPointerException if any key or value in {@code valuesMap} is null
- */
- CoCombineResult(Map<TupleTag<?>, Object> valuesMap) {
- ImmutableMap.Builder<TupleTag<?>, Object> builder = ImmutableMap.builder();
- for (Entry<TupleTag<?>, Object> entry : valuesMap.entrySet()) {
- if (entry.getValue() != null) {
- builder.put(entry);
- } else {
- builder.put(entry.getKey(), NullValue.INSTANCE);
- }
- }
- this.valuesMap = builder.build();
- }
-
- /**
- * Returns the value represented by the given {@link TupleTag}.
- *
- * <p>It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
- */
- @SuppressWarnings("unchecked")
- public <V> V get(TupleTag<V> tag) {
- checkArgument(
- valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");
- Object value = valuesMap.get(tag);
- if (value == NullValue.INSTANCE) {
- return null;
- } else {
- return (V) value;
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * A composed {@link CombineFn} that applies multiple {@link CombineFn CombineFns}.
- *
- * <p>For each {@link CombineFn} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedCombineFn<DataT> extends CombineFn<DataT, Object[], CoCombineResult> {
-
- private final List<CombineFn<Object, Object, Object>> combineFns;
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedCombineFn() {
- this.extractInputFns = ImmutableList.of();
- this.combineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedCombineFn(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<CombineFn<?, ?, ?>> combineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<CombineFn<Object, Object, Object>> castedCombineFns = (List) combineFns;
- this.combineFns = castedCombineFns;
-
- this.outputTags = outputTags;
- this.combineFnCount = this.combineFns.size();
- }
-
- /**
- * Returns a {@link ComposedCombineFn} with an additional {@link CombineFn}.
- */
- public <InputT, OutputT> ComposedCombineFn<DataT> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedCombineFn<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<CombineFn<?, ?, ?>>builder()
- .addAll(combineFns)
- .add(combineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedCombineFnWithContext} with an additional
- * {@link CombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> combineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- List<CombineFnWithContext<Object, Object, Object>> fnsWithContext = Lists.newArrayList();
- for (CombineFn<Object, Object, Object> fn : combineFns) {
- fnsWithContext.add(toFnWithContext(fn));
- }
- return new ComposedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
- .addAll(fnsWithContext)
- .add(combineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- @Override
- public Object[] createAccumulator() {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = combineFns.get(i).createAccumulator();
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(Object[] accumulator, DataT value) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = combineFns.get(i).addInput(accumulator[i], input);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(Iterable<Object[]> accumulators) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator();
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = combineFns.get(i).mergeAccumulators(new ProjectionIterable(accumulators, i));
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(Object[] accumulator) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- combineFns.get(i).extractOutput(accumulator[i]));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(Object[] accumulator) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = combineFns.get(i).compact(accumulator[i]);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(CoderRegistry registry, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
- }
-
- /**
- * A composed {@link CombineFnWithContext} that applies multiple
- * {@link CombineFnWithContext CombineFnWithContexts}.
- *
- * <p>For each {@link CombineFnWithContext} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedCombineFnWithContext<DataT>
- extends CombineFnWithContext<DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<CombineFnWithContext<Object, Object, Object>> combineFnWithContexts;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedCombineFnWithContext() {
- this.extractInputFns = ImmutableList.of();
- this.combineFnWithContexts = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedCombineFnWithContext(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<CombineFnWithContext<?, ?, ?>> combineFnWithContexts,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns =
- (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- List<CombineFnWithContext<Object, Object, Object>> castedCombineFnWithContexts
- = (List) combineFnWithContexts;
- this.combineFnWithContexts = castedCombineFnWithContexts;
-
- this.outputTags = outputTags;
- this.combineFnCount = this.combineFnWithContexts.size();
- }
-
- /**
- * Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}.
- */
- public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- GlobalCombineFn<InputT, ?, OutputT> globalCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
- .addAll(combineFnWithContexts)
- .add(toFnWithContext(globalCombineFn))
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- @Override
- public Object[] createAccumulator(Context c) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = combineFnWithContexts.get(i).createAccumulator(c);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(Object[] accumulator, DataT value, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = combineFnWithContexts.get(i).addInput(accumulator[i], input, c);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(Iterable<Object[]> accumulators, Context c) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(c);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = combineFnWithContexts.get(i).mergeAccumulators(
- new ProjectionIterable(accumulators, i), c);
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(Object[] accumulator, Context c) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- combineFnWithContexts.get(i).extractOutput(accumulator[i], c));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(Object[] accumulator, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = combineFnWithContexts.get(i).compact(accumulator[i], c);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(CoderRegistry registry, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
- }
-
- /**
- * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}.
- *
- * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFn<DataT, K>
- extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFn() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFn(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFn<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link KeyedCombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
- Lists.newArrayList();
- for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
- fnsWithContext.add(toFnWithContext(fn));
- }
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(fnsWithContext)
- .add(keyedCombineFn)
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFn<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link CombineFnWithContext}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i));
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i]));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
- }
-
- /**
- * A composed {@link KeyedCombineFnWithContext} that applies multiple
- * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}.
- *
- * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with
- * the {@code extractInputFn} and combines them,
- * and then it outputs each combined value with a {@link TupleTag} to a
- * {@link CoCombineResult}.
- */
- public static class ComposedKeyedCombineFnWithContext<DataT, K>
- extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> {
-
- private final List<SerializableFunction<DataT, Object>> extractInputFns;
- private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns;
- private final List<TupleTag<?>> outputTags;
- private final int combineFnCount;
-
- private ComposedKeyedCombineFnWithContext() {
- this.extractInputFns = ImmutableList.of();
- this.keyedCombineFns = ImmutableList.of();
- this.outputTags = ImmutableList.of();
- this.combineFnCount = 0;
- }
-
- private ComposedKeyedCombineFnWithContext(
- ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
- ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns,
- ImmutableList<TupleTag<?>> outputTags) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<SerializableFunction<DataT, Object>> castedExtractInputFns =
- (List) extractInputFns;
- this.extractInputFns = castedExtractInputFns;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns =
- (List) keyedCombineFns;
- this.keyedCombineFns = castedKeyedCombineFns;
- this.outputTags = outputTags;
- this.combineFnCount = this.keyedCombineFns.size();
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link PerKeyCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- checkUniqueness(outputTags, outputTag);
- return new ComposedKeyedCombineFnWithContext<>(
- ImmutableList.<SerializableFunction<DataT, ?>>builder()
- .addAll(extractInputFns)
- .add(extractInputFn)
- .build(),
- ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
- .addAll(keyedCombineFns)
- .add(toFnWithContext(perKeyCombineFn))
- .build(),
- ImmutableList.<TupleTag<?>>builder()
- .addAll(outputTags)
- .add(outputTag)
- .build());
- }
-
- /**
- * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
- * {@link GlobalCombineFn}.
- */
- public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
- SimpleFunction<DataT, InputT> extractInputFn,
- GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn,
- TupleTag<OutputT> outputTag) {
- return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag);
- }
-
- @Override
- public Object[] createAccumulator(K key, Context c) {
- Object[] accumsArray = new Object[combineFnCount];
- for (int i = 0; i < combineFnCount; ++i) {
- accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c);
- }
- return accumsArray;
- }
-
- @Override
- public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- Object input = extractInputFns.get(i).apply(value);
- accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c);
- }
- return accumulator;
- }
-
- @Override
- public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) {
- Iterator<Object[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(key, c);
- } else {
- // Reuses the first accumulator, and overwrites its values.
- // It is safe because {@code accum[i]} only depends on
- // the i-th component of each accumulator.
- Object[] accum = iter.next();
- for (int i = 0; i < combineFnCount; ++i) {
- accum[i] = keyedCombineFns.get(i).mergeAccumulators(
- key, new ProjectionIterable(accumulators, i), c);
- }
- return accum;
- }
- }
-
- @Override
- public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) {
- Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
- for (int i = 0; i < combineFnCount; ++i) {
- valuesMap.put(
- outputTags.get(i),
- keyedCombineFns.get(i).extractOutput(key, accumulator[i], c));
- }
- return new CoCombineResult(valuesMap);
- }
-
- @Override
- public Object[] compact(K key, Object[] accumulator, Context c) {
- for (int i = 0; i < combineFnCount; ++i) {
- accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c);
- }
- return accumulator;
- }
-
- @Override
- public Coder<Object[]> getAccumulatorCoder(
- CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
- throws CannotProvideCoderException {
- List<Coder<Object>> coders = Lists.newArrayList();
- for (int i = 0; i < combineFnCount; ++i) {
- Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
- coders.add(keyedCombineFns.get(i).getAccumulatorCoder(
- registry, keyCoder, inputCoder));
- }
- return new ComposedAccumulatorCoder(coders);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private static class ProjectionIterable implements Iterable<Object> {
- private final Iterable<Object[]> iterable;
- private final int column;
-
- private ProjectionIterable(Iterable<Object[]> iterable, int column) {
- this.iterable = iterable;
- this.column = column;
- }
-
- @Override
- public Iterator<Object> iterator() {
- final Iterator<Object[]> iter = iterable.iterator();
- return new Iterator<Object>() {
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Object next() {
- return iter.next()[column];
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }
-
- private static class ComposedAccumulatorCoder extends StandardCoder<Object[]> {
- private List<Coder<Object>> coders;
- private int codersCount;
-
- public ComposedAccumulatorCoder(List<Coder<Object>> coders) {
- this.coders = ImmutableList.copyOf(coders);
- this.codersCount = coders.size();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @JsonCreator
- public static ComposedAccumulatorCoder of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- return new ComposedAccumulatorCoder((List) components);
- }
-
- @Override
- public void encode(Object[] value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- checkArgument(value.length == codersCount);
- Context nestedContext = context.nested();
- for (int i = 0; i < codersCount; ++i) {
- coders.get(i).encode(value[i], outStream, nestedContext);
- }
- }
-
- @Override
- public Object[] decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- Object[] ret = new Object[codersCount];
- Context nestedContext = context.nested();
- for (int i = 0; i < codersCount; ++i) {
- ret[i] = coders.get(i).decode(inStream, nestedContext);
- }
- return ret;
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return coders;
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- for (int i = 0; i < codersCount; ++i) {
- coders.get(i).verifyDeterministic();
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private static <InputT, AccumT, OutputT> CombineFnWithContext<InputT, AccumT, OutputT>
- toFnWithContext(GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
- if (globalCombineFn instanceof CombineFnWithContext) {
- return (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
- } else {
- final CombineFn<InputT, AccumT, OutputT> combineFn =
- (CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
- return new CombineFnWithContext<InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(Context c) {
- return combineFn.createAccumulator();
- }
- @Override
- public AccumT addInput(AccumT accumulator, InputT input, Context c) {
- return combineFn.addInput(accumulator, input);
- }
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
- return combineFn.mergeAccumulators(accumulators);
- }
- @Override
- public OutputT extractOutput(AccumT accumulator, Context c) {
- return combineFn.extractOutput(accumulator);
- }
- @Override
- public AccumT compact(AccumT accumulator, Context c) {
- return combineFn.compact(accumulator);
- }
- @Override
- public OutputT defaultValue() {
- return combineFn.defaultValue();
- }
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return combineFn.getAccumulatorCoder(registry, inputCoder);
- }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return combineFn.getDefaultOutputCoder(registry, inputCoder);
- }
- };
- }
- }
-
- private static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
- toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
- @SuppressWarnings("unchecked")
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
- return keyedCombineFnWithContext;
- } else {
- @SuppressWarnings("unchecked")
- final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
- return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, Context c) {
- return keyedCombineFn.createAccumulator(key);
- }
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
- return keyedCombineFn.addInput(key, accumulator, value);
- }
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
- @Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return keyedCombineFn.compact(key, accumulator);
- }
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
- }
- };
- }
- }
-
- private static <OutputT> void checkUniqueness(
- List<TupleTag<?>> registeredTags, TupleTag<OutputT> outputTag) {
- checkArgument(
- !registeredTags.contains(outputTag),
- "Cannot compose with tuple tag %s because it is already present in the composition.",
- outputTag);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
deleted file mode 100644
index fdf56e3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * This class contains combine functions that have access to {@code PipelineOptions} and side inputs
- * through {@code CombineWithContext.Context}.
- *
- * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend.
- */
-public class CombineWithContext {
-
- /**
- * Information accessible to all methods in {@code CombineFnWithContext}
- * and {@code KeyedCombineFnWithContext}.
- */
- public abstract static class Context {
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
- * invoking this {@code KeyedCombineFn}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Returns the value of the side input for the window corresponding to the
- * window of the main input element.
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
- }
-
- /**
- * An internal interface for signaling that a {@code GloballyCombineFn}
- * or a {@code PerKeyCombineFn} needs to access {@code CombineWithContext.Context}.
- *
- * <p>For internal use only.
- */
- public interface RequiresContextInternal {}
-
- /**
- * A combine function that has access to {@code PipelineOptions} and side inputs through
- * {@code CombineWithContext.Context}.
- *
- * See the equivalent {@link CombineFn} for details about combine functions.
- */
- public abstract static class CombineFnWithContext<InputT, AccumT, OutputT>
- extends CombineFnBase.AbstractGlobalCombineFn<InputT, AccumT, OutputT>
- implements RequiresContextInternal {
- /**
- * Returns a new, mutable accumulator value, representing the accumulation of zero input values.
- *
- * <p>It is equivalent to {@link CombineFn#createAccumulator}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract AccumT createAccumulator(Context c);
-
- /**
- * Adds the given input value to the given accumulator, returning the
- * new accumulator value.
- *
- * <p>It is equivalent to {@link CombineFn#addInput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract AccumT addInput(AccumT accumulator, InputT input, Context c);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>It is equivalent to {@link CombineFn#mergeAccumulators}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * <p>It is equivalent to {@link CombineFn#extractOutput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract OutputT extractOutput(AccumT accumulator, Context c);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>It is equivalent to {@link CombineFn#compact}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public AccumT compact(AccumT accumulator, Context c) {
- return accumulator;
- }
-
- @Override
- public OutputT defaultValue() {
- throw new UnsupportedOperationException(
- "Override this function to provide the default value.");
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() {
- // The key, an object, is never even looked at.
- return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, Context c) {
- return CombineFnWithContext.this.createAccumulator(c);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) {
- return CombineFnWithContext.this.addInput(accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
- return CombineFnWithContext.this.mergeAccumulators(accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.extractOutput(accumulator, c);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return CombineFnWithContext.this.compact(accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
- Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder);
- }
-
- @Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
- return CombineFnWithContext.this;
- }
- };
- }
- }
-
- /**
- * A keyed combine function that has access to {@code PipelineOptions} and side inputs through
- * {@code CombineWithContext.Context}.
- *
- * See the equivalent {@link KeyedCombineFn} for details about keyed combine functions.
- */
- public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
- extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
- implements RequiresContextInternal {
- /**
- * Returns a new, mutable accumulator value representing the accumulation of zero input values.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator},
- * but it has additional access to {@code CombineWithContext.Context}.
- */
- public abstract AccumT createAccumulator(K key, Context c);
-
- /**
- * Adds the given input value to the given accumulator, returning the new accumulator value.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c);
-
- /**
- * Returns an accumulator representing the accumulation of all the
- * input values accumulated in the merging accumulators.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators},
- * but it has additional access to {@code CombineWithContext.Context}..
- */
- public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c);
-
- /**
- * Returns the output value that is the result of combining all
- * the input values represented by the given accumulator.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public abstract OutputT extractOutput(K key, AccumT accumulator, Context c);
-
- /**
- * Returns an accumulator that represents the same logical value as the
- * input accumulator, but may have a more compact representation.
- *
- * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to
- * {@code CombineWithContext.Context}.
- */
- public AccumT compact(K key, AccumT accumulator, Context c) {
- return accumulator;
- }
-
- /**
- * Applies this {@code KeyedCombineFnWithContext} to a key and a collection
- * of input values to produce a combined output value.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) {
- AccumT accum = createAccumulator(key, c);
- for (InputT input : inputs) {
- accum = addInput(key, accum, input, c);
- }
- return extractOutput(key, accum, c);
- }
-
- @Override
- public CombineFnWithContext<InputT, AccumT, OutputT> forKey(
- final K key, final Coder<K> keyCoder) {
- return new CombineFnWithContext<InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(Context c) {
- return KeyedCombineFnWithContext.this.createAccumulator(key, c);
- }
-
- @Override
- public AccumT addInput(AccumT accumulator, InputT input, Context c) {
- return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
- return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c);
- }
-
- @Override
- public OutputT extractOutput(AccumT accumulator, Context c) {
- return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c);
- }
-
- @Override
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
- throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
- }
-
- @Override
- public Coder<OutputT> getDefaultOutputCoder(
- CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
- return KeyedCombineFnWithContext.this.getDefaultOutputCoder(
- registry, keyCoder, inputCoder);
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
deleted file mode 100644
index ffa11d1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code PTransorm}s to count the elements in a {@link PCollection}.
- *
- * <p>{@link Count#perElement()} can be used to count the number of occurrences of each
- * distinct element in the PCollection, {@link Count#perKey()} can be used to count the
- * number of values per key, and {@link Count#globally()} can be used to count the total
- * number of elements in a PCollection.
- */
-public class Count {
- private Count() {
- // do not instantiate
- }
-
- /**
- * Returns a {@link Combine.Globally} {@link PTransform} that counts the number of elements in
- * its input {@link PCollection}.
- */
- public static <T> Combine.Globally<T, Long> globally() {
- return Combine.globally(new CountFn<T>()).named("Count.Globally");
- }
-
- /**
- * Returns a {@link Combine.PerKey} {@link PTransform} that counts the number of elements
- * associated with each key of its input {@link PCollection}.
- */
- public static <K, V> Combine.PerKey<K, V, Long> perKey() {
- return Combine.<K, V, Long>perKey(new CountFn<V>()).named("Count.PerKey");
- }
-
- /**
- * Returns a {@link PerElement Count.PerElement} {@link PTransform} that counts the number of
- * occurrences of each element in its input {@link PCollection}.
- *
- * <p>See {@link PerElement Count.PerElement} for more details.
- */
- public static <T> PerElement<T> perElement() {
- return new PerElement<>();
- }
-
- /**
- * {@code Count.PerElement<T>} takes a {@code PCollection<T>} and returns a
- * {@code PCollection<KV<T, Long>>} representing a map from each distinct element of the input
- * {@code PCollection} to the number of times that element occurs in the input. Each key in the
- * output {@code PCollection} is unique.
- *
- * <p>This transform compares two values of type {@code T} by first encoding each element using
- * the input {@code PCollection}'s {@code Coder}, then comparing the encoded bytes. Because of
- * this, the input coder must be deterministic.
- * (See {@link com.google.cloud.dataflow.sdk.coders.Coder#verifyDeterministic()} for more detail).
- * Performing the comparison in this manner admits efficient parallel evaluation.
- *
- * <p>By default, the {@code Coder} of the keys of the output {@code PCollection} is the same as
- * the {@code Coder} of the elements of the input {@code PCollection}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> words = ...;
- * PCollection<KV<String, Long>> wordCounts =
- * words.apply(Count.<String>perElement());
- * } </pre>
- *
- * @param <T> the type of the elements of the input {@code PCollection}, and the type of the keys
- * of the output {@code PCollection}
- */
- public static class PerElement<T>
- extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
-
- public PerElement() { }
-
- @Override
- public PCollection<KV<T, Long>> apply(PCollection<T> input) {
- return
- input
- .apply(ParDo.named("Init").of(new DoFn<T, KV<T, Void>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), (Void) null));
- }
- }))
- .apply(Count.<T, Void>perKey());
- }
- }
-
- /**
- * A {@link CombineFn} that counts elements.
- */
- private static class CountFn<T> extends CombineFn<T, Long, Long> {
-
- @Override
- public Long createAccumulator() {
- return 0L;
- }
-
- @Override
- public Long addInput(Long accumulator, T input) {
- return accumulator + 1;
- }
-
- @Override
- public Long mergeAccumulators(Iterable<Long> accumulators) {
- long result = 0L;
- for (Long accum : accumulators) {
- result += accum;
- }
- return result;
- }
-
- @Override
- public Long extractOutput(Long accumulator) {
- return accumulator;
- }
- }
-}