You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:48 UTC

[24/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
deleted file mode 100644
index a0b06cf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFnBase.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-
-/**
- * This class contains the shared interfaces and abstract classes for different types of combine
- * functions.
- *
- * <p>Users should not implement or extend them directly.
- */
-public class CombineFnBase {
-  /**
-   * A {@code GloballyCombineFn<InputT, AccumT, OutputT>} specifies how to combine a
-   * collection of input values of type {@code InputT} into a single
-   * output value of type {@code OutputT}.  It does this via one or more
-   * intermediate mutable accumulator values of type {@code AccumT}.
-   *
-   * <p>Do not implement this interface directly.
-   * Extends {@link CombineFn} and {@link CombineFnWithContext} instead.
-   *
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable {
-
-    /**
-     * Returns the {@code Coder} to use for accumulator {@code AccumT}
-     * values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being used
-     * for {@code InputT} values and the enclosing {@code Pipeline}'s
-     * {@code CoderRegistry} to try to infer the Coder for {@code AccumT}
-     * values.
-     *
-     * <p>This is the Coder used to send data through a communication-intensive
-     * shuffle step, so a compact and efficient representation may have
-     * significant performance benefits.
-     */
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-        throws CannotProvideCoderException;
-
-    /**
-     * Returns the {@code Coder} to use by default for output
-     * {@code OutputT} values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being
-     * used for input {@code InputT} values and the enclosing
-     * {@code Pipeline}'s {@code CoderRegistry} to try to infer the
-     * Coder for {@code OutputT} values.
-     */
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-        throws CannotProvideCoderException;
-
-    /**
-     * Returns the error message for not supported default values in Combine.globally().
-     */
-    public String getIncompatibleGlobalWindowErrorMessage();
-
-    /**
-     * Returns the default value when there are no values added to the accumulator.
-     */
-    public OutputT defaultValue();
-
-    /**
-     * Converts this {@code GloballyCombineFn} into an equivalent
-     * {@link PerKeyCombineFn} that ignores the keys passed to it and
-     * combines the values according to this {@code GloballyCombineFn}.
-     *
-     * @param <K> the type of the (ignored) keys
-     */
-    public <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn();
-  }
-
-  /**
-   * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
-   * a collection of input values of type {@code InputT}, associated with
-   * a key of type {@code K}, into a single output value of type
-   * {@code OutputT}.  It does this via one or more intermediate mutable
-   * accumulator values of type {@code AccumT}.
-   *
-   * <p>Do not implement this interface directly.
-   * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
-   *
-   * @param <K> type of keys
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  public interface PerKeyCombineFn<K, InputT, AccumT, OutputT> extends Serializable {
-    /**
-     * Returns the {@code Coder} to use for accumulator {@code AccumT}
-     * values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being
-     * used for {@code K} keys and input {@code InputT} values and the
-     * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
-     * infer the Coder for {@code AccumT} values.
-     *
-     * <p>This is the Coder used to send data through a communication-intensive
-     * shuffle step, so a compact and efficient representation may have
-     * significant performance benefits.
-     */
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
-    /**
-     * Returns the {@code Coder} to use by default for output
-     * {@code OutputT} values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being
-     * used for {@code K} keys and input {@code InputT} values and the
-     * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
-     * infer the Coder for {@code OutputT} values.
-     */
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
-    /**
-     * Returns the a regular {@link GlobalCombineFn} that operates on a specific key.
-     */
-    public abstract GlobalCombineFn<InputT, AccumT, OutputT> forKey(
-        final K key, final Coder<K> keyCoder);
-  }
-
-  /**
-   * An abstract {@link GlobalCombineFn} base class shared by
-   * {@link CombineFn} and {@link CombineFnWithContext}.
-   *
-   * <p>Do not extend this class directly.
-   * Extends {@link CombineFn} and {@link CombineFnWithContext} instead.
-   *
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  abstract static class AbstractGlobalCombineFn<InputT, AccumT, OutputT>
-      implements GlobalCombineFn<InputT, AccumT, OutputT>, Serializable {
-    private static final String INCOMPATIBLE_GLOBAL_WINDOW_ERROR_MESSAGE =
-        "Default values are not supported in Combine.globally() if the output "
-        + "PCollection is not windowed by GlobalWindows. Instead, use "
-        + "Combine.globally().withoutDefaults() to output an empty PCollection if the input "
-        + "PCollection is empty, or Combine.globally().asSingletonView() to get the default "
-        + "output of the CombineFn if the input PCollection is empty.";
-
-    @Override
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-        throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder), getAccumTVariable());
-    }
-
-    @Override
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-        throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder, getAccumTVariable(),
-              this.getAccumulatorCoder(registry, inputCoder)),
-          getOutputTVariable());
-    }
-
-    @Override
-    public String getIncompatibleGlobalWindowErrorMessage() {
-      return INCOMPATIBLE_GLOBAL_WINDOW_ERROR_MESSAGE;
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code InputT}.
-     */
-    public TypeVariable<?> getInputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<InputT>(AbstractGlobalCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code AccumT}.
-     */
-    public TypeVariable<?> getAccumTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<AccumT>(AbstractGlobalCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code OutputT}.
-     */
-    public TypeVariable<?> getOutputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<OutputT>(AbstractGlobalCombineFn.class) {}.getType();
-    }
-  }
-
-  /**
-   * An abstract {@link PerKeyCombineFn} base class shared by
-   * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}.
-   *
-   * <p>Do not extends this class directly.
-   * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
-   *
-   * @param <K> type of keys
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFn<K, InputT, AccumT, OutputT> {
-    @Override
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(
-              getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder),
-          getAccumTVariable());
-    }
-
-    @Override
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(),
-              inputCoder, getAccumTVariable(),
-              this.getAccumulatorCoder(registry, keyCoder, inputCoder)),
-          getOutputTVariable());
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code K}.
-     */
-    public TypeVariable<?> getKTypeVariable() {
-      return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code InputT}.
-     */
-    public TypeVariable<?> getInputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code AccumT}.
-     */
-    public TypeVariable<?> getAccumTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code OutputT}.
-     */
-    public TypeVariable<?> getOutputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
deleted file mode 100644
index 656c010..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Static utility methods that create combine function instances.
- */
-public class CombineFns {
-
-  /**
-   * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
-   * {@link PerKeyCombineFn}.
-   *
-   * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
-   *
-   * <p>Example:
-   * <pre>{ @code
-   * PCollection<KV<K, Integer>> latencies = ...;
-   *
-   * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
-   * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
-   *
-   * SimpleFunction<Integer, Integer> identityFn =
-   *     new SimpleFunction<Integer, Integer>() {
-   *       @Override
-   *       public Integer apply(Integer input) {
-   *           return input;
-   *       }};
-   * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
-   *     Combine.perKey(
-   *         CombineFns.composeKeyed()
-   *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
-   *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
-   *
-   * PCollection<T> finalResultCollection = maxAndMean
-   *     .apply(ParDo.of(
-   *         new DoFn<KV<K, CoCombineResult>, T>() {
-   *           @Override
-   *           public void processElement(ProcessContext c) throws Exception {
-   *             KV<K, CoCombineResult> e = c.element();
-   *             Integer maxLatency = e.getValue().get(maxLatencyTag);
-   *             Double meanLatency = e.getValue().get(meanLatencyTag);
-   *             .... Do Something ....
-   *             c.output(...some T...);
-   *           }
-   *         }));
-   * } </pre>
-   */
-  public static ComposeKeyedCombineFnBuilder composeKeyed() {
-    return new ComposeKeyedCombineFnBuilder();
-  }
-
-  /**
-   * Returns a {@link ComposeCombineFnBuilder} to construct a composed
-   * {@link GlobalCombineFn}.
-   *
-   * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
-   *
-   * <p>Example:
-   * <pre>{ @code
-   * PCollection<Integer> globalLatencies = ...;
-   *
-   * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
-   * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
-   *
-   * SimpleFunction<Integer, Integer> identityFn =
-   *     new SimpleFunction<Integer, Integer>() {
-   *       @Override
-   *       public Integer apply(Integer input) {
-   *           return input;
-   *       }};
-   * PCollection<CoCombineResult> maxAndMean = globalLatencies.apply(
-   *     Combine.globally(
-   *         CombineFns.compose()
-   *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
-   *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
-   *
-   * PCollection<T> finalResultCollection = maxAndMean
-   *     .apply(ParDo.of(
-   *         new DoFn<CoCombineResult, T>() {
-   *           @Override
-   *           public void processElement(ProcessContext c) throws Exception {
-   *             CoCombineResult e = c.element();
-   *             Integer maxLatency = e.get(maxLatencyTag);
-   *             Double meanLatency = e.get(meanLatencyTag);
-   *             .... Do Something ....
-   *             c.output(...some T...);
-   *           }
-   *         }));
-   * } </pre>
-   */
-  public static ComposeCombineFnBuilder compose() {
-    return new ComposeCombineFnBuilder();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A builder class to construct a composed {@link PerKeyCombineFn}.
-   */
-  public static class ComposeKeyedCombineFnBuilder {
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedKeyedCombineFn<DataT, K>()
-          .with(extractInputFn, keyedCombineFn, outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedKeyedCombineFnWithContext<DataT, K>()
-          .with(extractInputFn, keyedCombineFnWithContext, outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> combineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag);
-    }
-  }
-
-  /**
-   * A builder class to construct a composed {@link GlobalCombineFn}.
-   */
-  public static class ComposeCombineFnBuilder {
-    /**
-     * Returns a {@link ComposedCombineFn} that can take additional
-     * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedCombineFn} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code combineFn},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <DataT, InputT, OutputT> ComposedCombineFn<DataT> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> combineFn,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedCombineFn<DataT>()
-          .with(extractInputFn, combineFn, outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedCombineFnWithContext} that can take additional
-     * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedCombineFnWithContext} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code combineFnWithContext},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <DataT, InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedCombineFnWithContext<DataT>()
-          .with(extractInputFn, combineFnWithContext, outputTag);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A tuple of outputs produced by a composed combine functions.
-   *
-   * <p>See {@link #compose()} or {@link #composeKeyed()}) for details.
-   */
-  public static class CoCombineResult implements Serializable {
-
-    private enum NullValue {
-      INSTANCE;
-    }
-
-    private final Map<TupleTag<?>, Object> valuesMap;
-
-    /**
-     * The constructor of {@link CoCombineResult}.
-     *
-     * <p>Null values should have been filtered out from the {@code valuesMap}.
-     * {@link TupleTag TupleTags} that associate with null values doesn't exist in the key set of
-     * {@code valuesMap}.
-     *
-     * @throws NullPointerException if any key or value in {@code valuesMap} is null
-     */
-    CoCombineResult(Map<TupleTag<?>, Object> valuesMap) {
-      ImmutableMap.Builder<TupleTag<?>, Object> builder = ImmutableMap.builder();
-      for (Entry<TupleTag<?>, Object> entry : valuesMap.entrySet()) {
-        if (entry.getValue() != null) {
-          builder.put(entry);
-        } else {
-          builder.put(entry.getKey(), NullValue.INSTANCE);
-        }
-      }
-      this.valuesMap = builder.build();
-    }
-
-    /**
-     * Returns the value represented by the given {@link TupleTag}.
-     *
-     * <p>It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
-     */
-    @SuppressWarnings("unchecked")
-    public <V> V get(TupleTag<V> tag) {
-      checkArgument(
-          valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");
-      Object value = valuesMap.get(tag);
-      if (value == NullValue.INSTANCE) {
-        return null;
-      } else {
-        return (V) value;
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A composed {@link CombineFn} that applies multiple {@link CombineFn CombineFns}.
-   *
-   * <p>For each {@link CombineFn} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedCombineFn<DataT> extends CombineFn<DataT, Object[], CoCombineResult> {
-
-    private final List<CombineFn<Object, Object, Object>> combineFns;
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedCombineFn() {
-      this.extractInputFns = ImmutableList.of();
-      this.combineFns = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedCombineFn(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<CombineFn<?, ?, ?>> combineFns,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<CombineFn<Object, Object, Object>> castedCombineFns = (List) combineFns;
-      this.combineFns = castedCombineFns;
-
-      this.outputTags = outputTags;
-      this.combineFnCount = this.combineFns.size();
-    }
-
-    /**
-     * Returns a {@link ComposedCombineFn} with an additional {@link CombineFn}.
-     */
-    public <InputT, OutputT> ComposedCombineFn<DataT> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> combineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedCombineFn<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-              .addAll(extractInputFns)
-              .add(extractInputFn)
-              .build(),
-          ImmutableList.<CombineFn<?, ?, ?>>builder()
-              .addAll(combineFns)
-              .add(combineFn)
-              .build(),
-          ImmutableList.<TupleTag<?>>builder()
-              .addAll(outputTags)
-              .add(outputTag)
-              .build());
-    }
-
-    /**
-     * Returns a {@link ComposedCombineFnWithContext} with an additional
-     * {@link CombineFnWithContext}.
-     */
-    public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> combineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      List<CombineFnWithContext<Object, Object, Object>> fnsWithContext = Lists.newArrayList();
-      for (CombineFn<Object, Object, Object> fn : combineFns) {
-        fnsWithContext.add(toFnWithContext(fn));
-      }
-      return new ComposedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-              .addAll(extractInputFns)
-              .add(extractInputFn)
-              .build(),
-          ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
-              .addAll(fnsWithContext)
-              .add(combineFn)
-              .build(),
-          ImmutableList.<TupleTag<?>>builder()
-              .addAll(outputTags)
-              .add(outputTag)
-              .build());
-    }
-
-    @Override
-    public Object[] createAccumulator() {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = combineFns.get(i).createAccumulator();
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(Object[] accumulator, DataT value) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = combineFns.get(i).addInput(accumulator[i], input);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(Iterable<Object[]> accumulators) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator();
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = combineFns.get(i).mergeAccumulators(new ProjectionIterable(accumulators, i));
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(Object[] accumulator) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            combineFns.get(i).extractOutput(accumulator[i]));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(Object[] accumulator) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = combineFns.get(i).compact(accumulator[i]);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(CoderRegistry registry, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-  }
-
-  /**
-   * A composed {@link CombineFnWithContext} that applies multiple
-   * {@link CombineFnWithContext CombineFnWithContexts}.
-   *
-   * <p>For each {@link CombineFnWithContext} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedCombineFnWithContext<DataT>
-      extends CombineFnWithContext<DataT, Object[], CoCombineResult> {
-
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<CombineFnWithContext<Object, Object, Object>> combineFnWithContexts;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedCombineFnWithContext() {
-      this.extractInputFns = ImmutableList.of();
-      this.combineFnWithContexts = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedCombineFnWithContext(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<CombineFnWithContext<?, ?, ?>> combineFnWithContexts,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns =
-          (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      List<CombineFnWithContext<Object, Object, Object>> castedCombineFnWithContexts
-          = (List) combineFnWithContexts;
-      this.combineFnWithContexts = castedCombineFnWithContexts;
-
-      this.outputTags = outputTags;
-      this.combineFnCount = this.combineFnWithContexts.size();
-    }
-
-    /**
-     * Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}.
-     */
-    public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        GlobalCombineFn<InputT, ?, OutputT> globalCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-              .addAll(extractInputFns)
-              .add(extractInputFn)
-              .build(),
-          ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
-              .addAll(combineFnWithContexts)
-              .add(toFnWithContext(globalCombineFn))
-              .build(),
-          ImmutableList.<TupleTag<?>>builder()
-              .addAll(outputTags)
-              .add(outputTag)
-              .build());
-    }
-
-    @Override
-    public Object[] createAccumulator(Context c) {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = combineFnWithContexts.get(i).createAccumulator(c);
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(Object[] accumulator, DataT value, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = combineFnWithContexts.get(i).addInput(accumulator[i], input, c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(Iterable<Object[]> accumulators, Context c) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(c);
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = combineFnWithContexts.get(i).mergeAccumulators(
-              new ProjectionIterable(accumulators, i), c);
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(Object[] accumulator, Context c) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            combineFnWithContexts.get(i).extractOutput(accumulator[i], c));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(Object[] accumulator, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = combineFnWithContexts.get(i).compact(accumulator[i], c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(CoderRegistry registry, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-  }
-
-  /**
-   * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}.
-   *
-   * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedKeyedCombineFn<DataT, K>
-      extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> {
-
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedKeyedCombineFn() {
-      this.extractInputFns = ImmutableList.of();
-      this.keyedCombineFns = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedKeyedCombineFn(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns =
-          (List) keyedCombineFns;
-      this.keyedCombineFns = castedKeyedCombineFns;
-      this.outputTags = outputTags;
-      this.combineFnCount = this.keyedCombineFns.size();
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedKeyedCombineFn<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-          .addAll(extractInputFns)
-          .add(extractInputFn)
-          .build(),
-      ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder()
-          .addAll(keyedCombineFns)
-          .add(keyedCombineFn)
-          .build(),
-      ImmutableList.<TupleTag<?>>builder()
-          .addAll(outputTags)
-          .add(outputTag)
-          .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link KeyedCombineFnWithContext}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
-          Lists.newArrayList();
-      for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
-        fnsWithContext.add(toFnWithContext(fn));
-      }
-      return new ComposedKeyedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-          .addAll(extractInputFns)
-          .add(extractInputFn)
-          .build(),
-      ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
-          .addAll(fnsWithContext)
-          .add(keyedCombineFn)
-          .build(),
-      ImmutableList.<TupleTag<?>>builder()
-          .addAll(outputTags)
-          .add(outputTag)
-          .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link CombineFnWithContext}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    @Override
-    public Object[] createAccumulator(K key) {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key);
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(K key, Object[] accumulator, DataT value) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(key);
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = keyedCombineFns.get(i).mergeAccumulators(
-              key, new ProjectionIterable(accumulators, i));
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(K key, Object[] accumulator) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            keyedCombineFns.get(i).extractOutput(key, accumulator[i]));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(K key, Object[] accumulator) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(
-        CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-  }
-
-  /**
-   * A composed {@link KeyedCombineFnWithContext} that applies multiple
-   * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}.
-   *
-   * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedKeyedCombineFnWithContext<DataT, K>
-      extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> {
-
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedKeyedCombineFnWithContext() {
-      this.extractInputFns = ImmutableList.of();
-      this.keyedCombineFns = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedKeyedCombineFnWithContext(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns =
-          (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns =
-          (List) keyedCombineFns;
-      this.keyedCombineFns = castedKeyedCombineFns;
-      this.outputTags = outputTags;
-      this.combineFnCount = this.keyedCombineFns.size();
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link PerKeyCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedKeyedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-              .addAll(extractInputFns)
-              .add(extractInputFn)
-              .build(),
-          ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
-              .addAll(keyedCombineFns)
-              .add(toFnWithContext(perKeyCombineFn))
-              .build(),
-          ImmutableList.<TupleTag<?>>builder()
-              .addAll(outputTags)
-              .add(outputTag)
-              .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link GlobalCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    @Override
-    public Object[] createAccumulator(K key, Context c) {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c);
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(key, c);
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = keyedCombineFns.get(i).mergeAccumulators(
-              key, new ProjectionIterable(accumulators, i), c);
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            keyedCombineFns.get(i).extractOutput(key, accumulator[i], c));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(K key, Object[] accumulator, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(
-        CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(keyedCombineFns.get(i).getAccumulatorCoder(
-            registry, keyCoder, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static class ProjectionIterable implements Iterable<Object> {
-    private final Iterable<Object[]> iterable;
-    private final int column;
-
-    private ProjectionIterable(Iterable<Object[]> iterable, int column) {
-      this.iterable = iterable;
-      this.column = column;
-    }
-
-    @Override
-    public Iterator<Object> iterator() {
-      final Iterator<Object[]> iter = iterable.iterator();
-      return new Iterator<Object>() {
-        @Override
-        public boolean hasNext() {
-          return iter.hasNext();
-        }
-
-        @Override
-        public Object next() {
-          return iter.next()[column];
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-      };
-    }
-  }
-
-  private static class ComposedAccumulatorCoder extends StandardCoder<Object[]> {
-    private List<Coder<Object>> coders;
-    private int codersCount;
-
-    public ComposedAccumulatorCoder(List<Coder<Object>> coders) {
-      this.coders = ImmutableList.copyOf(coders);
-      this.codersCount  = coders.size();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @JsonCreator
-    public static ComposedAccumulatorCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      return new ComposedAccumulatorCoder((List) components);
-    }
-
-    @Override
-    public void encode(Object[] value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      checkArgument(value.length == codersCount);
-      Context nestedContext = context.nested();
-      for (int i = 0; i < codersCount; ++i) {
-        coders.get(i).encode(value[i], outStream, nestedContext);
-      }
-    }
-
-    @Override
-    public Object[] decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      Object[] ret = new Object[codersCount];
-      Context nestedContext = context.nested();
-      for (int i = 0; i < codersCount; ++i) {
-        ret[i] = coders.get(i).decode(inStream, nestedContext);
-      }
-      return ret;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return coders;
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      for (int i = 0; i < codersCount; ++i) {
-        coders.get(i).verifyDeterministic();
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <InputT, AccumT, OutputT> CombineFnWithContext<InputT, AccumT, OutputT>
-  toFnWithContext(GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
-    if (globalCombineFn instanceof CombineFnWithContext) {
-      return (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn;
-    } else {
-      final CombineFn<InputT, AccumT, OutputT> combineFn =
-          (CombineFn<InputT, AccumT, OutputT>) globalCombineFn;
-      return new CombineFnWithContext<InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(Context c) {
-          return combineFn.createAccumulator();
-        }
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input, Context c) {
-          return combineFn.addInput(accumulator, input);
-        }
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
-          return combineFn.mergeAccumulators(accumulators);
-        }
-        @Override
-        public OutputT extractOutput(AccumT accumulator, Context c) {
-          return combineFn.extractOutput(accumulator);
-        }
-        @Override
-        public AccumT compact(AccumT accumulator, Context c) {
-          return combineFn.compact(accumulator);
-        }
-        @Override
-        public OutputT defaultValue() {
-          return combineFn.defaultValue();
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return combineFn.getAccumulatorCoder(registry, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return combineFn.getDefaultOutputCoder(registry, inputCoder);
-        }
-      };
-    }
-  }
-
-  private static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
-  toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      @SuppressWarnings("unchecked")
-      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return keyedCombineFnWithContext;
-    } else {
-      @SuppressWarnings("unchecked")
-      final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key, Context c) {
-          return keyedCombineFn.createAccumulator(key);
-        }
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
-          return keyedCombineFn.addInput(key, accumulator, value);
-        }
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-          return keyedCombineFn.mergeAccumulators(key, accumulators);
-        }
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.extractOutput(key, accumulator);
-        }
-        @Override
-        public AccumT compact(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.compact(key, accumulator);
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
-        }
-      };
-    }
-  }
-
-  private static <OutputT> void checkUniqueness(
-      List<TupleTag<?>> registeredTags, TupleTag<OutputT> outputTag) {
-    checkArgument(
-        !registeredTags.contains(outputTag),
-        "Cannot compose with tuple tag %s because it is already present in the composition.",
-        outputTag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
deleted file mode 100644
index fdf56e3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineWithContext.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-/**
- * This class contains combine functions that have access to {@code PipelineOptions} and side inputs
- * through {@code CombineWithContext.Context}.
- *
- * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend.
- */
-public class CombineWithContext {
-
-  /**
-   * Information accessible to all methods in {@code CombineFnWithContext}
-   * and {@code KeyedCombineFnWithContext}.
-   */
-  public abstract static class Context {
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
-     * invoking this {@code KeyedCombineFn}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-  }
-
-  /**
-   * An internal interface for signaling that a {@code GloballyCombineFn}
-   * or a {@code PerKeyCombineFn} needs to access {@code CombineWithContext.Context}.
-   *
-   * <p>For internal use only.
-   */
-  public interface RequiresContextInternal {}
-
-  /**
-   * A combine function that has access to {@code PipelineOptions} and side inputs through
-   * {@code CombineWithContext.Context}.
-   *
-   * See the equivalent {@link CombineFn} for details about combine functions.
-   */
-  public abstract static class CombineFnWithContext<InputT, AccumT, OutputT>
-      extends CombineFnBase.AbstractGlobalCombineFn<InputT, AccumT, OutputT>
-      implements RequiresContextInternal {
-    /**
-     * Returns a new, mutable accumulator value, representing the accumulation of zero input values.
-     *
-     * <p>It is equivalent to {@link CombineFn#createAccumulator}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT createAccumulator(Context c);
-
-    /**
-     * Adds the given input value to the given accumulator, returning the
-     * new accumulator value.
-     *
-     * <p>It is equivalent to {@link CombineFn#addInput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT addInput(AccumT accumulator, InputT input, Context c);
-
-    /**
-     * Returns an accumulator representing the accumulation of all the
-     * input values accumulated in the merging accumulators.
-     *
-     * <p>It is equivalent to {@link CombineFn#mergeAccumulators}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c);
-
-    /**
-     * Returns the output value that is the result of combining all
-     * the input values represented by the given accumulator.
-     *
-     * <p>It is equivalent to {@link CombineFn#extractOutput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract OutputT extractOutput(AccumT accumulator, Context c);
-
-    /**
-     * Returns an accumulator that represents the same logical value as the
-     * input accumulator, but may have a more compact representation.
-     *
-     * <p>It is equivalent to {@link CombineFn#compact}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public AccumT compact(AccumT accumulator, Context c) {
-      return accumulator;
-    }
-
-    @Override
-    public OutputT defaultValue() {
-      throw new UnsupportedOperationException(
-          "Override this function to provide the default value.");
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() {
-      // The key, an object, is never even looked at.
-      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key, Context c) {
-          return CombineFnWithContext.this.createAccumulator(c);
-        }
-
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) {
-          return CombineFnWithContext.this.addInput(accumulator, input, c);
-        }
-
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-          return CombineFnWithContext.this.mergeAccumulators(accumulators, c);
-        }
-
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-          return CombineFnWithContext.this.extractOutput(accumulator, c);
-        }
-
-        @Override
-        public AccumT compact(K key, AccumT accumulator, Context c) {
-          return CombineFnWithContext.this.compact(accumulator, c);
-        }
-
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder);
-        }
-
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder);
-        }
-
-        @Override
-        public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
-          return CombineFnWithContext.this;
-        }
-      };
-    }
-  }
-
-  /**
-   * A keyed combine function that has access to {@code PipelineOptions} and side inputs through
-   * {@code CombineWithContext.Context}.
-   *
-   * See the equivalent {@link KeyedCombineFn} for details about keyed combine functions.
-   */
-  public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
-      extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
-      implements RequiresContextInternal {
-    /**
-     * Returns a new, mutable accumulator value representing the accumulation of zero input values.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator},
-     * but it has additional access to {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT createAccumulator(K key, Context c);
-
-    /**
-     * Adds the given input value to the given accumulator, returning the new accumulator value.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c);
-
-    /**
-     * Returns an accumulator representing the accumulation of all the
-     * input values accumulated in the merging accumulators.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators},
-     * but it has additional access to {@code CombineWithContext.Context}..
-     */
-    public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c);
-
-    /**
-     * Returns the output value that is the result of combining all
-     * the input values represented by the given accumulator.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract OutputT extractOutput(K key, AccumT accumulator, Context c);
-
-    /**
-     * Returns an accumulator that represents the same logical value as the
-     * input accumulator, but may have a more compact representation.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public AccumT compact(K key, AccumT accumulator, Context c) {
-      return accumulator;
-    }
-
-    /**
-     * Applies this {@code KeyedCombineFnWithContext} to a key and a collection
-     * of input values to produce a combined output value.
-     */
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) {
-      AccumT accum = createAccumulator(key, c);
-      for (InputT input : inputs) {
-        accum = addInput(key, accum, input, c);
-      }
-      return extractOutput(key, accum, c);
-    }
-
-    @Override
-    public CombineFnWithContext<InputT, AccumT, OutputT> forKey(
-        final K key, final Coder<K> keyCoder) {
-      return new CombineFnWithContext<InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(Context c) {
-          return KeyedCombineFnWithContext.this.createAccumulator(key, c);
-        }
-
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input, Context c) {
-          return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c);
-        }
-
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
-          return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c);
-        }
-
-        @Override
-        public OutputT extractOutput(AccumT accumulator, Context c) {
-          return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c);
-        }
-
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return KeyedCombineFnWithContext.this.getDefaultOutputCoder(
-              registry, keyCoder, inputCoder);
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
deleted file mode 100644
index ffa11d1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code PTransorm}s to count the elements in a {@link PCollection}.
- *
- * <p>{@link Count#perElement()} can be used to count the number of occurrences of each
- * distinct element in the PCollection, {@link Count#perKey()} can be used to count the
- * number of values per key, and {@link Count#globally()} can be used to count the total
- * number of elements in a PCollection.
- */
-public class Count {
-  private Count() {
-    // do not instantiate
-  }
-
-  /**
-   * Returns a {@link Combine.Globally} {@link PTransform} that counts the number of elements in
-   * its input {@link PCollection}.
-   */
-  public static <T> Combine.Globally<T, Long> globally() {
-    return Combine.globally(new CountFn<T>()).named("Count.Globally");
-  }
-
-  /**
-   * Returns a {@link Combine.PerKey} {@link PTransform} that counts the number of elements
-   * associated with each key of its input {@link PCollection}.
-   */
-  public static <K, V> Combine.PerKey<K, V, Long> perKey() {
-    return Combine.<K, V, Long>perKey(new CountFn<V>()).named("Count.PerKey");
-  }
-
-  /**
-   * Returns a {@link PerElement Count.PerElement} {@link PTransform} that counts the number of
-   * occurrences of each element in its input {@link PCollection}.
-   *
-   * <p>See {@link PerElement Count.PerElement} for more details.
-   */
-  public static <T> PerElement<T> perElement() {
-    return new PerElement<>();
-  }
-
-  /**
-   * {@code Count.PerElement<T>} takes a {@code PCollection<T>} and returns a
-   * {@code PCollection<KV<T, Long>>} representing a map from each distinct element of the input
-   * {@code PCollection} to the number of times that element occurs in the input. Each key in the
-   * output {@code PCollection} is unique.
-   *
-   * <p>This transform compares two values of type {@code T} by first encoding each element using
-   * the input {@code PCollection}'s {@code Coder}, then comparing the encoded bytes. Because of
-   * this, the input coder must be deterministic.
-   * (See {@link com.google.cloud.dataflow.sdk.coders.Coder#verifyDeterministic()} for more detail).
-   * Performing the comparison in this manner admits efficient parallel evaluation.
-   *
-   * <p>By default, the {@code Coder} of the keys of the output {@code PCollection} is the same as
-   * the {@code Coder} of the elements of the input {@code PCollection}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<String> words = ...;
-   * PCollection<KV<String, Long>> wordCounts =
-   *     words.apply(Count.<String>perElement());
-   * } </pre>
-   *
-   * @param <T> the type of the elements of the input {@code PCollection}, and the type of the keys
-   * of the output {@code PCollection}
-   */
-  public static class PerElement<T>
-      extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
-
-    public PerElement() { }
-
-    @Override
-    public PCollection<KV<T, Long>> apply(PCollection<T> input) {
-      return
-          input
-          .apply(ParDo.named("Init").of(new DoFn<T, KV<T, Void>>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element(), (Void) null));
-            }
-          }))
-          .apply(Count.<T, Void>perKey());
-    }
-  }
-
-  /**
-   * A {@link CombineFn} that counts elements.
-   */
-  private static class CountFn<T> extends CombineFn<T, Long, Long> {
-
-    @Override
-    public Long createAccumulator() {
-      return 0L;
-    }
-
-    @Override
-    public Long addInput(Long accumulator, T input) {
-      return accumulator + 1;
-    }
-
-    @Override
-    public Long mergeAccumulators(Iterable<Long> accumulators) {
-      long result = 0L;
-      for (Long accum : accumulators) {
-        result += accum;
-      }
-      return result;
-    }
-
-    @Override
-    public Long extractOutput(Long accumulator) {
-      return accumulator;
-    }
-  }
-}