You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 02:25:39 UTC

[3/6] beam git commit: Remove KeyedCombineFn

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 5ffaef8..0be8517 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -43,12 +43,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -155,7 +152,7 @@ public class Combine {
    */
   public static <K, V> PerKey<K, V, V> perKey(
       SerializableFunction<Iterable<V>, V> fn) {
-    return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+    return perKey(IterableCombineFn.of(fn), displayDataForFn(fn));
   }
 
   /**
@@ -176,32 +173,11 @@ public class Combine {
    */
   public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
       GlobalCombineFn<? super InputT, ?, OutputT> fn) {
-    return perKey(fn.<K>asKeyedFn(), displayDataForFn(fn));
-  }
-
-  /**
-   * Returns a {@link PerKey Combine.PerKey} {@code PTransform} that
-   * first groups its input {@code PCollection} of {@code KV}s by keys and
-   * windows, then invokes the given function on each of the key/values-lists
-   * pairs to produce a combined value, and then returns a
-   * {@code PCollection} of {@code KV}s mapping each distinct key to
-   * its combined value for each window.
-   *
-   * <p>Each output element is in the window by which its corresponding input
-   * was grouped, and has the timestamp of the end of that window.  The output
-   * {@code PCollection} has the same
-   * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-   * as the input.
-   *
-   * <p>See {@link PerKey Combine.PerKey} for more information.
-   */
-  public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
-      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
     return perKey(fn, displayDataForFn(fn));
   }
 
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
-          PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+      GlobalCombineFn<? super InputT, ?, OutputT> fn,
       DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
   }
@@ -211,7 +187,7 @@ public class Combine {
    * in {@link GroupByKey}.
    */
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
-      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+      GlobalCombineFn<? super InputT, ?, OutputT> fn,
       DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
   }
@@ -239,7 +215,7 @@ public class Combine {
    */
   public static <K, V> GroupedValues<K, V, V> groupedValues(
       SerializableFunction<Iterable<V>, V> fn) {
-    return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn));
+    return groupedValues(IterableCombineFn.of(fn), displayDataForFn(fn));
   }
 
   /**
@@ -265,37 +241,11 @@ public class Combine {
    */
   public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
       GlobalCombineFn<? super InputT, ?, OutputT> fn) {
-    return groupedValues(fn.<K>asKeyedFn(), displayDataForFn(fn));
-  }
-
-  /**
-   * Returns a {@link GroupedValues Combine.GroupedValues}
-   * {@code PTransform} that takes a {@code PCollection} of
-   * {@code KV}s where a key maps to an {@code Iterable} of values, e.g.,
-   * the result of a {@code GroupByKey}, then uses the given
-   * {@code KeyedCombineFn} to combine all the values associated with
-   * each key.  The combining function is provided the key.  The types
-   * of the input and output values can differ.
-   *
-   * <p>Each output element has the same timestamp and is in the same window
-   * as its corresponding input element, and the output
-   * {@code PCollection} has the same
-   * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-   * associated with it as the input.
-   *
-   * <p>See {@link GroupedValues Combine.GroupedValues} for more information.
-   *
-   * <p>Note that {@link #perKey(CombineFnBase.PerKeyCombineFn)} is typically
-   * more convenient to use than {@link GroupByKey} followed by
-   * {@code groupedValues(...)}.
-   */
-  public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
-      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
     return groupedValues(fn, displayDataForFn(fn));
   }
 
   private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
-      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+      GlobalCombineFn<? super InputT, ?, OutputT> fn,
       DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new GroupedValues<>(fn, fnDisplayData);
   }
@@ -471,81 +421,8 @@ public class Combine {
     public TypeDescriptor<OutputT> getOutputType() {
       return new TypeDescriptor<OutputT>(getClass()) {};
     }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
-      // The key, an object, is never even looked at.
-      return new KeyIgnoringCombineFn<>(this);
-    }
-
-    private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
-        extends KeyedCombineFn<K, InputT, AccumT, OutputT>
-        implements NameOverride {
-
-      private final CombineFn<InputT, AccumT, OutputT> fn;
-
-      private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
-        this.fn = fn;
-      }
-
-      @Override
-      public AccumT createAccumulator(K key) {
-        return fn.createAccumulator();
-      }
-
-      @Override
-      public AccumT addInput(K key, AccumT accumulator, InputT input) {
-        return fn.addInput(accumulator, input);
-      }
-
-      @Override
-      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-        return fn.mergeAccumulators(accumulators);
-      }
-
-      @Override
-      public OutputT extractOutput(K key, AccumT accumulator) {
-        return fn.extractOutput(accumulator);
-      }
-
-      @Override
-      public AccumT compact(K key, AccumT accumulator) {
-        return fn.compact(accumulator);
-      }
-
-      @Override
-      public Coder<AccumT> getAccumulatorCoder(
-          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-          throws CannotProvideCoderException {
-        return fn.getAccumulatorCoder(registry, inputCoder);
-      }
-
-      @Override
-      public Coder<OutputT> getDefaultOutputCoder(
-          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-          throws CannotProvideCoderException {
-        return fn.getDefaultOutputCoder(registry, inputCoder);
-      }
-
-      @Override
-      public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
-        return fn;
-      }
-
-      @Override
-      public void populateDisplayData(Builder builder) {
-        builder.delegate(fn);
-      }
-
-      @Override
-      public String getNameOverride() {
-        return NameUtils.approximateSimpleName(fn);
-      }
-    }
   }
 
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -621,7 +498,6 @@ public class Combine {
     public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCoder) {
       return inputCoder;
     }
-
   }
 
   /**
@@ -1083,215 +959,6 @@ public class Combine {
 
   /////////////////////////////////////////////////////////////////////////////
 
-
-  /**
-   * A {@code KeyedCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
-   * a collection of input values of type {@code InputT}, associated with
-   * a key of type {@code K}, into a single output value of type
-   * {@code OutputT}.  It does this via one or more intermediate mutable
-   * accumulator values of type {@code AccumT}.
-   *
-   * <p>The overall process to combine a collection of input
-   * {@code InputT} values associated with an input {@code K} key into a
-   * single output {@code OutputT} value is as follows:
-   *
-   * <ol>
-   *
-   * <li> The input {@code InputT} values are partitioned into one or more
-   * batches.
-   *
-   * <li> For each batch, the {@link #createAccumulator} operation is
-   * invoked to create a fresh mutable accumulator value of type
-   * {@code AccumT}, initialized to represent the combination of zero
-   * values.
-   *
-   * <li> For each input {@code InputT} value in a batch, the
-   * {@link #addInput} operation is invoked to add the value to that
-   * batch's accumulator {@code AccumT} value.  The accumulator may just
-   * record the new value (e.g., if {@code AccumT == List<InputT>}, or may do
-   * work to represent the combination more compactly.
-   *
-   * <li> The {@link #mergeAccumulators} operation is invoked to
-   * combine a collection of accumulator {@code AccumT} values into a
-   * single combined output accumulator {@code AccumT} value, once the
-   * merging accumulators have had all all the input values in their
-   * batches added to them.  This operation is invoked repeatedly,
-   * until there is only one accumulator value left.
-   *
-   * <li> The {@link #extractOutput} operation is invoked on the final
-   * accumulator {@code AccumT} value to get the output {@code OutputT} value.
-   *
-   * </ol>
-   *
-   * <p>All of these operations are passed the {@code K} key that the
-   * values being combined are associated with.
-   *
-   * <p>For example:
-   * <pre> {@code
-   * public class ConcatFn
-   *     extends KeyedCombineFn<String, Integer, ConcatFn.Accum, String> {
-   *   public static class Accum {
-   *     String s = "";
-   *   }
-   *   public Accum createAccumulator(String key) {
-   *     return new Accum();
-   *   }
-   *   public Accum addInput(String key, Accum accum, Integer input) {
-   *       accum.s += "+" + input;
-   *       return accum;
-   *   }
-   *   public Accum mergeAccumulators(String key, Iterable<Accum> accums) {
-   *     Accum merged = new Accum();
-   *     for (Accum accum : accums) {
-   *       merged.s += accum.s;
-   *     }
-   *     return merged;
-   *   }
-   *   public String extractOutput(String key, Accum accum) {
-   *     return key + accum.s;
-   *   }
-   * }
-   * PCollection<KV<String, Integer>> pc = ...;
-   * PCollection<KV<String, String>> pc2 = pc.apply(
-   *     Combine.perKey(new ConcatFn()));
-   * } </pre>
-   *
-   * <p>Keyed combining functions used by {@link Combine.PerKey},
-   * {@link Combine.GroupedValues}, and {@code PTransforms} derived
-   * from them should be <i>associative</i> and <i>commutative</i>.
-   * Associativity is required because input values are first broken
-   * up into subgroups before being combined, and their intermediate
-   * results further combined, in an arbitrary tree structure.
-   * Commutativity is required because any order of the input values
-   * is ignored when breaking up input values into groups.
-   *
-   * @param <K> type of keys
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  public abstract static class KeyedCombineFn<K, InputT, AccumT, OutputT>
-      extends AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> {
-    /**
-     * Returns a new, mutable accumulator value representing the accumulation of zero input values.
-     *
-     * @param key the key that all the accumulated values using the
-     * accumulator are associated with
-     */
-    public abstract AccumT createAccumulator(K key);
-
-    /**
-     * Adds the given input value to the given accumulator, returning the new accumulator value.
-     *
-     * <p>For efficiency, the input accumulator may be modified and returned.
-     *
-     * @param key the key that all the accumulated values using the
-     * accumulator are associated with
-     */
-    public abstract AccumT addInput(K key, AccumT accumulator, InputT value);
-
-    /**
-     * Returns an accumulator representing the accumulation of all the
-     * input values accumulated in the merging accumulators.
-     *
-     * <p>May modify any of the argument accumulators.  May return a
-     * fresh accumulator, or may return one of the (modified) argument
-     * accumulators.
-     *
-     * @param key the key that all the accumulators are associated
-     * with
-     */
-    public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators);
-
-    /**
-     * Returns the output value that is the result of combining all
-     * the input values represented by the given accumulator.
-     *
-     * @param key the key that all the accumulated values using the
-     * accumulator are associated with
-     */
-    public abstract OutputT extractOutput(K key, AccumT accumulator);
-
-    /**
-     * Returns an accumulator that represents the same logical value as the
-     * input accumulator, but may have a more compact representation.
-     *
-     * <p>For most CombineFns this would be a no-op, but should be overridden
-     * by CombineFns that (for example) buffer up elements and combine
-     * them in batches.
-     *
-     * <p>For efficiency, the input accumulator may be modified and returned.
-     *
-     * <p>By default returns the original accumulator.
-     */
-    public AccumT compact(K key, AccumT accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public CombineFn<InputT, AccumT, OutputT> forKey(final K key, final Coder<K> keyCoder) {
-      return new CombineFn<InputT, AccumT, OutputT>() {
-
-        @Override
-        public AccumT createAccumulator() {
-          return KeyedCombineFn.this.createAccumulator(key);
-        }
-
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input) {
-          return KeyedCombineFn.this.addInput(key, accumulator, input);
-        }
-
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-          return KeyedCombineFn.this.mergeAccumulators(key, accumulators);
-        }
-
-        @Override
-        public OutputT extractOutput(AccumT accumulator) {
-          return KeyedCombineFn.this.extractOutput(key, accumulator);
-        }
-
-        @Override
-        public AccumT compact(AccumT accumulator) {
-          return KeyedCombineFn.this.compact(key, accumulator);
-        }
-
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return KeyedCombineFn.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder);
-        }
-
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(KeyedCombineFn.this);
-        }
-      };
-    }
-
-    /**
-     * Applies this {@code KeyedCombineFn} to a key and a collection
-     * of input values to produce a combined output value.
-     *
-     * <p>Useful when testing the behavior of a {@code KeyedCombineFn}
-     * separately from a {@code Combine} transform.
-     */
-    public OutputT apply(K key, Iterable<? extends InputT> inputs) {
-      AccumT accum = createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = addInput(key, accum, input);
-      }
-      return extractOutput(key, accum);
-    }
-  }
-
   ////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -1458,8 +1125,7 @@ public class Combine {
           .apply(WithKeys.<Void, InputT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
 
-      Combine.PerKey<Void, InputT, OutputT> combine =
-          Combine.fewKeys(fn.asKeyedFn(), fnDisplayData);
+      Combine.PerKey<Void, InputT, OutputT> combine = Combine.fewKeys(fn, fnDisplayData);
       if (!sideInputs.isEmpty()) {
         combine = combine.withSideInputs(sideInputs);
       }
@@ -1788,13 +1454,13 @@ public class Combine {
   public static class PerKey<K, InputT, OutputT>
     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
-    private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean fewKeys;
     private final List<PCollectionView<?>> sideInputs;
 
     private PerKey(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
@@ -1803,7 +1469,7 @@ public class Combine {
     }
 
     private PerKey(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
       this.fn = fn;
@@ -1819,7 +1485,7 @@ public class Combine {
 
     /**
      * Returns a {@link PTransform} identical to this, but with the specified side inputs to use
-     * in {@link KeyedCombineFnWithContext}.
+     * in {@link CombineFnWithContext}.
      */
     public PerKey<K, InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) {
       return withSideInputs(Arrays.asList(sideInputs));
@@ -1827,7 +1493,7 @@ public class Combine {
 
     /**
      * Returns a {@link PTransform} identical to this, but with the specified side inputs to use
-     * in {@link KeyedCombineFnWithContext}.
+     * in {@link CombineFnWithContext}.
      */
     public PerKey<K, InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
@@ -1874,9 +1540,9 @@ public class Combine {
     }
 
     /**
-     * Returns the {@link PerKeyCombineFn} used by this Combine operation.
+     * Returns the {@link GlobalCombineFn} used by this Combine operation.
      */
-    public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+    public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
       return fn;
     }
 
@@ -1924,12 +1590,12 @@ public class Combine {
   public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
-    private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
     private PerKeyWithHotKeyFanout(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
       this.fn = fn;
@@ -1951,8 +1617,8 @@ public class Combine {
 
       // Name the accumulator type.
       @SuppressWarnings("unchecked")
-      final PerKeyCombineFn<K, InputT, AccumT, OutputT> typedFn =
-          (PerKeyCombineFn<K, InputT, AccumT, OutputT>) this.fn;
+      final GlobalCombineFn<InputT, AccumT, OutputT> typedFn =
+          (GlobalCombineFn<InputT, AccumT, OutputT>) this.fn;
 
       if (!(input.getCoder() instanceof KvCoder)) {
         throw new IllegalStateException(
@@ -1966,7 +1632,7 @@ public class Combine {
       try {
         accumCoder = typedFn.getAccumulatorCoder(
             input.getPipeline().getCoderRegistry(),
-            inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+            inputCoder.getValueCoder());
       } catch (CannotProvideCoderException e) {
         throw new IllegalStateException("Unable to determine accumulator coder.", e);
       }
@@ -1979,38 +1645,37 @@ public class Combine {
       // set of values, then drop the nonce and do a final combine of the
       // aggregates.  We do this by splitting the original CombineFn into two,
       // on that does addInput + merge and another that does merge + extract.
-      PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine;
-      PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
-      if (typedFn instanceof KeyedCombineFn) {
-        final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn =
-            (KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn;
+      GlobalCombineFn<InputT, AccumT, AccumT> hotPreCombine;
+      GlobalCombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
+      if (typedFn instanceof CombineFn) {
+        final CombineFn<InputT, AccumT, OutputT> fn =
+            (CombineFn<InputT, AccumT, OutputT>) typedFn;
         hotPreCombine =
-            new KeyedCombineFn<KV<K, Integer>, InputT, AccumT, AccumT>() {
+            new CombineFn<InputT, AccumT, AccumT>() {
               @Override
-              public AccumT createAccumulator(KV<K, Integer> key) {
-                return keyedFn.createAccumulator(key.getKey());
+              public AccumT createAccumulator() {
+                return fn.createAccumulator();
               }
               @Override
-              public AccumT addInput(KV<K, Integer> key, AccumT accumulator, InputT value) {
-                return keyedFn.addInput(key.getKey(), accumulator, value);
+              public AccumT addInput(AccumT accumulator, InputT value) {
+                return fn.addInput(accumulator, value);
               }
               @Override
-              public AccumT mergeAccumulators(
-                  KV<K, Integer> key, Iterable<AccumT> accumulators) {
-                return keyedFn.mergeAccumulators(key.getKey(), accumulators);
+              public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+                return fn.mergeAccumulators(accumulators);
               }
               @Override
-              public AccumT compact(KV<K, Integer> key, AccumT accumulator) {
-                return keyedFn.compact(key.getKey(), accumulator);
+              public AccumT compact(AccumT accumulator) {
+                return fn.compact(accumulator);
               }
               @Override
-              public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator) {
+              public AccumT extractOutput(AccumT accumulator) {
                 return accumulator;
               }
               @Override
               @SuppressWarnings("unchecked")
               public Coder<AccumT> getAccumulatorCoder(
-                  CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+                  CoderRegistry registry, Coder<InputT> inputCoder)
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
@@ -2020,142 +1685,147 @@ public class Combine {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
+
         postCombine =
-            new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+            new CombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
               @Override
-              public AccumT createAccumulator(K key) {
-                return keyedFn.createAccumulator(key);
+              public AccumT createAccumulator() {
+                return fn.createAccumulator();
               }
+
               @Override
-              public AccumT addInput(
-                  K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
+              public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
                 if (value.accum == null) {
-                  return keyedFn.addInput(key, accumulator, value.input);
+                  return fn.addInput(accumulator, value.input);
                 } else {
-                  return keyedFn.mergeAccumulators(key, ImmutableList.of(accumulator, value.accum));
+                  return fn.mergeAccumulators(ImmutableList.of(accumulator, value.accum));
                 }
               }
+
               @Override
-              public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-                return keyedFn.mergeAccumulators(key, accumulators);
+              public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+                return fn.mergeAccumulators(accumulators);
               }
+
               @Override
-              public AccumT compact(K key, AccumT accumulator) {
-                return keyedFn.compact(key, accumulator);
+              public AccumT compact(AccumT accumulator) {
+                return fn.compact(accumulator);
               }
+
               @Override
-              public OutputT extractOutput(K key, AccumT accumulator) {
-                return keyedFn.extractOutput(key, accumulator);
+              public OutputT extractOutput(AccumT accumulator) {
+                return fn.extractOutput(accumulator);
               }
+
               @Override
               public Coder<OutputT> getDefaultOutputCoder(
-                  CoderRegistry registry,
-                  Coder<K> keyCoder,
-                  Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+                  CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
                   throws CannotProvideCoderException {
-                return keyedFn.getDefaultOutputCoder(
-                    registry, keyCoder, inputCoder.getValueCoder());
+                return fn.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
               }
 
               @Override
-              public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-                  Coder<InputOrAccum<InputT, AccumT>> inputCoder)
-                      throws CannotProvideCoderException {
+              public Coder<AccumT> getAccumulatorCoder(
+                  CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+                  throws CannotProvideCoderException {
                 return accumCoder;
               }
+
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
-      } else if (typedFn instanceof KeyedCombineFnWithContext) {
-        final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
-            (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn;
+      } else if (typedFn instanceof CombineFnWithContext) {
+        final CombineFnWithContext<InputT, AccumT, OutputT> fnWithContext =
+            (CombineFnWithContext<InputT, AccumT, OutputT>) typedFn;
         hotPreCombine =
-            new KeyedCombineFnWithContext<KV<K, Integer>, InputT, AccumT, AccumT>() {
+            new CombineFnWithContext<InputT, AccumT, AccumT>() {
               @Override
-              public AccumT createAccumulator(KV<K, Integer> key, Context c) {
-                return keyedFnWithContext.createAccumulator(key.getKey(), c);
+              public AccumT createAccumulator(Context c) {
+                return fnWithContext.createAccumulator(c);
               }
 
               @Override
-              public AccumT addInput(
-                  KV<K, Integer> key, AccumT accumulator, InputT value, Context c) {
-                return keyedFnWithContext.addInput(key.getKey(), accumulator, value, c);
+              public AccumT addInput(AccumT accumulator, InputT value, Context c) {
+                return fnWithContext.addInput(accumulator, value, c);
               }
 
               @Override
-              public AccumT mergeAccumulators(
-                  KV<K, Integer> key, Iterable<AccumT> accumulators, Context c) {
-                return keyedFnWithContext.mergeAccumulators(key.getKey(), accumulators, c);
+              public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+                return fnWithContext.mergeAccumulators(accumulators, c);
               }
 
               @Override
-              public AccumT compact(KV<K, Integer> key, AccumT accumulator, Context c) {
-                return keyedFnWithContext.compact(key.getKey(), accumulator, c);
+              public AccumT compact(AccumT accumulator, Context c) {
+                return fnWithContext.compact(accumulator, c);
               }
 
               @Override
-              public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator, Context c) {
+              public AccumT extractOutput(AccumT accumulator, Context c) {
                 return accumulator;
               }
 
               @Override
               @SuppressWarnings("unchecked")
               public Coder<AccumT> getAccumulatorCoder(
-                  CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder)
+                  CoderRegistry registry, Coder<InputT> inputCoder)
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
-            new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
+            new CombineFnWithContext<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() {
               @Override
-              public AccumT createAccumulator(K key, Context c) {
-                return keyedFnWithContext.createAccumulator(key, c);
+              public AccumT createAccumulator(Context c) {
+                return fnWithContext.createAccumulator(c);
               }
+
               @Override
               public AccumT addInput(
-                  K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
+                  AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) {
                 if (value.accum == null) {
-                  return keyedFnWithContext.addInput(key, accumulator, value.input, c);
+                  return fnWithContext.addInput(accumulator, value.input, c);
                 } else {
-                  return keyedFnWithContext.mergeAccumulators(
-                      key, ImmutableList.of(accumulator, value.accum), c);
+                  return fnWithContext.mergeAccumulators(
+                      ImmutableList.of(accumulator, value.accum), c);
                 }
               }
+
               @Override
-              public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-                return keyedFnWithContext.mergeAccumulators(key, accumulators, c);
+              public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
+                return fnWithContext.mergeAccumulators(accumulators, c);
               }
+
               @Override
-              public AccumT compact(K key, AccumT accumulator, Context c) {
-                return keyedFnWithContext.compact(key, accumulator, c);
+              public AccumT compact(AccumT accumulator, Context c) {
+                return fnWithContext.compact(accumulator, c);
               }
+
               @Override
-              public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-                return keyedFnWithContext.extractOutput(key, accumulator, c);
+              public OutputT extractOutput(AccumT accumulator, Context c) {
+                return fnWithContext.extractOutput(accumulator, c);
               }
+
               @Override
               public Coder<OutputT> getDefaultOutputCoder(
-                  CoderRegistry registry,
-                  Coder<K> keyCoder,
-                  Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
+                  CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder)
                   throws CannotProvideCoderException {
-                return keyedFnWithContext.getDefaultOutputCoder(
-                    registry, keyCoder, inputCoder.getValueCoder());
+                return fnWithContext.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
               }
 
               @Override
-              public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-                  Coder<InputOrAccum<InputT, AccumT>> inputCoder)
+              public Coder<AccumT> getAccumulatorCoder(
+                  CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder)
                   throws CannotProvideCoderException {
                 return accumCoder;
               }
+
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
@@ -2202,25 +1872,33 @@ public class Combine {
       }
 
       // Combine the hot and cold keys separately.
-      PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot = split
-          .get(hot)
-          .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
-                               inputCoder.getValueCoder()))
-          .setWindowingStrategyInternal(preCombineStrategy)
-          .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData))
-          .apply("StripNonce", MapElements.via(
-              new SimpleFunction<KV<KV<K, Integer>, AccumT>,
-                       KV<K, InputOrAccum<InputT, AccumT>>>() {
-                @Override
-                public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) {
-                  return KV.of(
-                      elem.getKey().getKey(),
-                      InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
-                }
-              }))
-          .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
-          .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
-          .setWindowingStrategyInternal(input.getWindowingStrategy());
+      PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot =
+          split
+              .get(hot)
+              .setCoder(
+                  KvCoder.of(
+                      KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()),
+                      inputCoder.getValueCoder()))
+              .setWindowingStrategyInternal(preCombineStrategy)
+              .apply(
+                  "PreCombineHot",
+                  Combine.<KV<K, Integer>, InputT, AccumT>perKey(hotPreCombine, fnDisplayData))
+              .apply(
+                  "StripNonce",
+                  MapElements.via(
+                      new SimpleFunction<
+                          KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+                        @Override
+                        public KV<K, InputOrAccum<InputT, AccumT>> apply(
+                            KV<KV<K, Integer>, AccumT> elem) {
+                          return KV.of(
+                              elem.getKey().getKey(),
+                              InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
+                        }
+                      }))
+              .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
+              .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge())
+              .setWindowingStrategyInternal(input.getWindowingStrategy());
       PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
           .get(cold)
           .setCoder(inputCoder)
@@ -2235,9 +1913,12 @@ public class Combine {
           .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));
 
       // Combine the union of the pre-processed hot and cold key results.
-      return PCollectionList.of(precombinedHot).and(preprocessedCold)
+      return PCollectionList.of(precombinedHot)
+          .and(preprocessedCold)
           .apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections())
-          .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData));
+          .apply(
+              "PostCombine",
+              Combine.<K, InputOrAccum<InputT, AccumT>, OutputT>perKey(postCombine, fnDisplayData));
     }
 
     @Override
@@ -2325,71 +2006,61 @@ public class Combine {
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * {@code GroupedValues<K, InputT, OutputT>} takes a
-   * {@code PCollection<KV<K, Iterable<InputT>>>}, such as the result of
-   * {@link GroupByKey}, applies a specified
-   * {@link KeyedCombineFn KeyedCombineFn&lt;K, InputT, AccumT, OutputT&gt;}
-   * to each of the input {@code KV<K, Iterable<InputT>>} elements to
-   * produce a combined output {@code KV<K, OutputT>} element, and returns a
-   * {@code PCollection<KV<K, OutputT>>} containing all the combined output
-   * elements.  It is common for {@code InputT == OutputT}, but not required.
-   * Common combining functions include sums, mins, maxes, and averages
-   * of numbers, conjunctions and disjunctions of booleans, statistical
-   * aggregations, etc.
+   * {@code GroupedValues<K, InputT, OutputT>} takes a {@code PCollection<KV<K, Iterable<InputT>>>},
+   * such as the result of {@link GroupByKey}, applies a specified {@link CombineFn
+   * CombineFn&lt;InputT, AccumT, OutputT&gt;} to each of the input {@code KV<K,
+   * Iterable<InputT>>} elements to produce a combined output {@code KV<K, OutputT>} element, and
+   * returns a {@code PCollection<KV<K, OutputT>>} containing all the combined output elements. It
+   * is common for {@code InputT == OutputT}, but not required. Common combining functions include
+   * sums, mins, maxes, and averages of numbers, conjunctions and disjunctions of booleans,
+   * statistical aggregations, etc.
    *
    * <p>Example of use:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * PCollection<KV<String, Integer>> pc = ...;
    * PCollection<KV<String, Iterable<Integer>>> groupedByKey = pc.apply(
    *     new GroupByKey<String, Integer>());
    * PCollection<KV<String, Integer>> sumByKey = groupedByKey.apply(
    *     Combine.<String, Integer>groupedValues(
    *         new Sum.SumIntegerFn()));
-   * } </pre>
+   * }
+   * </pre>
    *
-   * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which
-   * captures the common pattern of "combining by key" in a
-   * single easy-to-use {@code PTransform}.
+   * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which captures the common pattern of
+   * "combining by key" in a single easy-to-use {@code PTransform}.
    *
-   * <p>Combining for different keys can happen in parallel.  Moreover,
-   * combining of the {@code Iterable<InputT>} values associated a single
-   * key can happen in parallel, with different subsets of the values
-   * being combined separately, and their intermediate results combined
-   * further, in an arbitrary tree reduction pattern, until a single
-   * result value is produced for each key.
+   * <p>Combining for different keys can happen in parallel. Moreover, combining of the {@code
+   * Iterable<InputT>} values associated a single key can happen in parallel, with different subsets
+   * of the values being combined separately, and their intermediate results combined further, in an
+   * arbitrary tree reduction pattern, until a single result value is produced for each key.
    *
-   * <p>By default, the {@code Coder} of the keys of the output
-   * {@code PCollection<KV<K, OutputT>>} is that of the keys of the input
-   * {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of the values
-   * of the output {@code PCollection<KV<K, OutputT>>} is inferred from the
-   * concrete type of the {@code KeyedCombineFn<K, InputT, AccumT, OutputT>}'s output
-   * type {@code OutputT}.
+   * <p>By default, the {@code Coder} of the keys of the output {@code PCollection<KV<K, OutputT>>}
+   * is that of the keys of the input {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of
+   * the values of the output {@code PCollection<KV<K, OutputT>>} is inferred from the concrete type
+   * of the {@code CombineFn<InputT, AccumT, OutputT>}'s output type {@code OutputT}.
    *
-   * <p>Each output element has the same timestamp and is in the same window
-   * as its corresponding input element, and the output
-   * {@code PCollection} has the same
-   * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-   * associated with it as the input.
+   * <p>Each output element has the same timestamp and is in the same window as its corresponding
+   * input element, and the output {@code PCollection} has the same {@link
+   * org.apache.beam.sdk.transforms.windowing.WindowFn} associated with it as the input.
    *
-   * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which
-   * combines all the values in a {@code PCollection} into a
-   * single value in a {@code PCollection}.
+   * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which combines all the values
+   * in a {@code PCollection} into a single value in a {@code PCollection}.
    *
    * @param <K> type of input and output keys
    * @param <InputT> type of input values
    * @param <OutputT> type of output values
    */
   public static class GroupedValues<K, InputT, OutputT>
-      extends PTransform
-                        <PCollection<? extends KV<K, ? extends Iterable<InputT>>>,
-                         PCollection<KV<K, OutputT>>> {
+      extends PTransform<
+          PCollection<? extends KV<K, ? extends Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
 
-    private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final List<PCollectionView<?>> sideInputs;
 
     private GroupedValues(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
@@ -2397,7 +2068,7 @@ public class Combine {
     }
 
     private GroupedValues(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         List<PCollectionView<?>> sideInputs) {
       this.fn = SerializableUtils.clone(fn);
@@ -2415,9 +2086,9 @@ public class Combine {
     }
 
     /**
-     * Returns the KeyedCombineFn used by this Combine operation.
+     * Returns the {@link GlobalCombineFn} used by this Combine operation.
      */
-    public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() {
+    public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
       return fn;
     }
 
@@ -2436,9 +2107,9 @@ public class Combine {
               K key = c.element().getKey();
 
               OutputT output;
-              if (fn instanceof KeyedCombineFnWithContext) {
-                output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn)
-                    .apply(key, c.element().getValue(), new CombineWithContext.Context() {
+              if (fn instanceof CombineFnWithContext) {
+                output = ((CombineFnWithContext<? super InputT, ?, OutputT>) fn)
+                    .apply(c.element().getValue(), new CombineWithContext.Context() {
                       @Override
                       public PipelineOptions getPipelineOptions() {
                         return c.getPipelineOptions();
@@ -2449,9 +2120,9 @@ public class Combine {
                         return c.sideInput(view);
                       }
                     });
-              } else if (fn instanceof KeyedCombineFn) {
-                output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn)
-                    .apply(key, c.element().getValue());
+              } else if (fn instanceof CombineFn) {
+                output = ((CombineFn<? super InputT, ?, OutputT>) fn)
+                    .apply(c.element().getValue());
               } else {
                 throw new IllegalStateException(
                     String.format("Unknown type of CombineFn: %s", fn.getClass()));
@@ -2516,10 +2187,9 @@ public class Combine {
       KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
       @SuppressWarnings("unchecked")
       Coder<OutputT> outputValueCoder =
-          ((PerKeyCombineFn<K, InputT, ?, OutputT>) fn)
-          .getDefaultOutputCoder(
-              input.getPipeline().getCoderRegistry(),
-              kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+          ((GlobalCombineFn<InputT, ?, OutputT>) fn)
+              .getDefaultOutputCoder(
+                  input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
       return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 770a390..a881099 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -25,9 +25,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -92,65 +90,6 @@ public class CombineFnBase {
      */
     OutputT defaultValue();
 
-    /**
-     * Converts this {@code GloballyCombineFn} into an equivalent
-     * {@link PerKeyCombineFn} that ignores the keys passed to it and
-     * combines the values according to this {@code GloballyCombineFn}.
-     *
-     * @param <K> the type of the (ignored) keys
-     */
-    <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn();
-  }
-
-  /**
-   * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine
-   * a collection of input values of type {@code InputT}, associated with
-   * a key of type {@code K}, into a single output value of type
-   * {@code OutputT}.  It does this via one or more intermediate mutable
-   * accumulator values of type {@code AccumT}.
-   *
-   * <p>Do not implement this interface directly.
-   * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
-   *
-   * @param <K> type of keys
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  public interface PerKeyCombineFn<K, InputT, AccumT, OutputT>
-  extends Serializable, HasDisplayData {
-    /**
-     * Returns the {@code Coder} to use for accumulator {@code AccumT}
-     * values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being
-     * used for {@code K} keys and input {@code InputT} values and the
-     * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
-     * infer the Coder for {@code AccumT} values.
-     *
-     * <p>This is the Coder used to send data through a communication-intensive
-     * shuffle step, so a compact and efficient representation may have
-     * significant performance benefits.
-     */
-    Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
-    /**
-     * Returns the {@code Coder} to use by default for output
-     * {@code OutputT} values, or null if it is not able to be inferred.
-     *
-     * <p>By default, uses the knowledge of the {@code Coder} being
-     * used for {@code K} keys and input {@code InputT} values and the
-     * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to
-     * infer the Coder for {@code OutputT} values.
-     */
-    Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException;
-
-    /**
-     * Returns the a regular {@link GlobalCombineFn} that operates on a specific key.
-     */
-    GlobalCombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder);
   }
 
   /**
@@ -228,79 +167,4 @@ public class CombineFnBase {
     public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
-
-  /**
-   * An abstract {@link PerKeyCombineFn} base class shared by
-   * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}.
-   *
-   * <p>Do not extends this class directly.
-   * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead.
-   *
-   * @param <K> type of keys
-   * @param <InputT> type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFn<K, InputT, AccumT, OutputT> {
-    @Override
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(
-              getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder),
-          getAccumTVariable());
-    }
-
-    @Override
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class,
-          ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(),
-              inputCoder, getAccumTVariable(),
-              this.getAccumulatorCoder(registry, keyCoder, inputCoder)),
-          getOutputTVariable());
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code K}.
-     */
-    public TypeVariable<?> getKTypeVariable() {
-      return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code InputT}.
-     */
-    public TypeVariable<?> getInputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code AccumT}.
-     */
-    public TypeVariable<?> getAccumTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * Returns the {@link TypeVariable} of {@code OutputT}.
-     */
-    public TypeVariable<?> getOutputTVariable() {
-      return (TypeVariable<?>)
-          new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * <p>By default, does not register any display data. Implementors may override this method
-     * to provide their own display data.
-     */
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index ca939c1..cc02dcf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -37,12 +37,9 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.CombineFnUtil;
@@ -54,49 +51,6 @@ import org.apache.beam.sdk.values.TupleTag;
 public class CombineFns {
 
   /**
-   * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
-   * {@link PerKeyCombineFn}.
-   *
-   * <p>The same {@link TupleTag} cannot be used in a composition multiple times.
-   *
-   * <p>Example:
-   * <pre>{@code
-   * PCollection<KV<K, Integer>> latencies = ...;
-   *
-   * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
-   * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
-   *
-   * SimpleFunction<Integer, Integer> identityFn =
-   *     new SimpleFunction<Integer, Integer>() {
-   *      {@literal @}Override
-   *       public Integer apply(Integer input) {
-   *           return input;
-   *       }};
-   * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
-   *     Combine.perKey(
-   *         CombineFns.composeKeyed()
-   *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
-   *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
-   *
-   * PCollection<T> finalResultCollection = maxAndMean
-   *     .apply(ParDo.of(
-   *         new DoFn<KV<K, CoCombineResult>, T>() {
-   *          {@literal @}ProcessElement
-   *           public void processElement(ProcessContext c) throws Exception {
-   *             KV<K, CoCombineResult> e = c.element();
-   *             Integer maxLatency = e.getValue().get(maxLatencyTag);
-   *             Double meanLatency = e.getValue().get(meanLatencyTag);
-   *             .... Do Something ....
-   *             c.output(...some T...);
-   *           }
-   *         }));
-   * }</pre>
-   */
-  public static ComposeKeyedCombineFnBuilder composeKeyed() {
-    return new ComposeKeyedCombineFnBuilder();
-  }
-
-  /**
    * Returns a {@link ComposeCombineFnBuilder} to construct a composed
    * {@link GlobalCombineFn}.
    *
@@ -142,67 +96,6 @@ public class CombineFns {
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * A builder class to construct a composed {@link PerKeyCombineFn}.
-   */
-  public static class ComposeKeyedCombineFnBuilder {
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedKeyedCombineFn<DataT, K>()
-          .with(extractInputFn, keyedCombineFn, outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     *
-     * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
-     * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
-     * and then it outputs each combined value with a {@link TupleTag} to a
-     * {@link CoCombineResult}.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext,
-        TupleTag<OutputT> outputTag) {
-      return new ComposedKeyedCombineFnWithContext<DataT, K>()
-          .with(extractInputFn, keyedCombineFnWithContext, outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> combineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
-     * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
-     */
-    public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag);
-    }
-  }
-
-  /**
    * A builder class to construct a composed {@link GlobalCombineFn}.
    */
   public static class ComposeCombineFnBuilder {
@@ -246,7 +139,7 @@ public class CombineFns {
   /**
    * A tuple of outputs produced by a composed combine functions.
    *
-   * <p>See {@link #compose()} or {@link #composeKeyed()}) for details.
+   * <p>See {@link #compose()} for details.
    */
   public static class CoCombineResult implements Serializable {
 
@@ -598,345 +491,6 @@ public class CombineFns {
     }
   }
 
-  /**
-   * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}.
-   *
-   * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedKeyedCombineFn<DataT, K>
-      extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> {
-
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedKeyedCombineFn() {
-      this.extractInputFns = ImmutableList.of();
-      this.keyedCombineFns = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedKeyedCombineFn(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns =
-          (List) keyedCombineFns;
-      this.keyedCombineFns = castedKeyedCombineFns;
-      this.outputTags = outputTags;
-      this.combineFnCount = this.keyedCombineFns.size();
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedKeyedCombineFn<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-          .addAll(extractInputFns)
-          .add(extractInputFn)
-          .build(),
-      ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder()
-          .addAll(keyedCombineFns)
-          .add(keyedCombineFn)
-          .build(),
-      ImmutableList.<TupleTag<?>>builder()
-          .addAll(outputTags)
-          .add(outputTag)
-          .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link KeyedCombineFnWithContext}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext =
-          Lists.newArrayList();
-      for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) {
-        fnsWithContext.add(CombineFnUtil.toFnWithContext(fn));
-      }
-      return new ComposedKeyedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-          .addAll(extractInputFns)
-          .add(extractInputFn)
-          .build(),
-      ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
-          .addAll(fnsWithContext)
-          .add(keyedCombineFn)
-          .build(),
-      ImmutableList.<TupleTag<?>>builder()
-          .addAll(outputTags)
-          .add(outputTag)
-          .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFn<InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link CombineFnWithContext}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    @Override
-    public Object[] createAccumulator(K key) {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key);
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(K key, Object[] accumulator, DataT value) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(key);
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = keyedCombineFns.get(i).mergeAccumulators(
-              key, new ProjectionIterable(accumulators, i));
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(K key, Object[] accumulator) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            keyedCombineFns.get(i).extractOutput(key, accumulator[i]));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(K key, Object[] accumulator) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(
-        CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      CombineFns.populateDisplayData(builder, keyedCombineFns);
-    }
-  }
-
-  /**
-   * A composed {@link KeyedCombineFnWithContext} that applies multiple
-   * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}.
-   *
-   * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with
-   * the {@code extractInputFn} and combines them,
-   * and then it outputs each combined value with a {@link TupleTag} to a
-   * {@link CoCombineResult}.
-   */
-  public static class ComposedKeyedCombineFnWithContext<DataT, K>
-      extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> {
-
-    private final List<SerializableFunction<DataT, Object>> extractInputFns;
-    private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns;
-    private final List<TupleTag<?>> outputTags;
-    private final int combineFnCount;
-
-    private ComposedKeyedCombineFnWithContext() {
-      this.extractInputFns = ImmutableList.of();
-      this.keyedCombineFns = ImmutableList.of();
-      this.outputTags = ImmutableList.of();
-      this.combineFnCount = 0;
-    }
-
-    private ComposedKeyedCombineFnWithContext(
-        ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
-        ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns,
-        ImmutableList<TupleTag<?>> outputTags) {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<SerializableFunction<DataT, Object>> castedExtractInputFns =
-          (List) extractInputFns;
-      this.extractInputFns = castedExtractInputFns;
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns =
-          (List) keyedCombineFns;
-      this.keyedCombineFns = castedKeyedCombineFns;
-      this.outputTags = outputTags;
-      this.combineFnCount = this.keyedCombineFns.size();
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link PerKeyCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn,
-        TupleTag<OutputT> outputTag) {
-      checkUniqueness(outputTags, outputTag);
-      return new ComposedKeyedCombineFnWithContext<>(
-          ImmutableList.<SerializableFunction<DataT, ?>>builder()
-              .addAll(extractInputFns)
-              .add(extractInputFn)
-              .build(),
-          ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder()
-              .addAll(keyedCombineFns)
-              .add(CombineFnUtil.toFnWithContext(perKeyCombineFn))
-              .build(),
-          ImmutableList.<TupleTag<?>>builder()
-              .addAll(outputTags)
-              .add(outputTag)
-              .build());
-    }
-
-    /**
-     * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional
-     * {@link GlobalCombineFn}.
-     */
-    public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with(
-        SimpleFunction<DataT, InputT> extractInputFn,
-        GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn,
-        TupleTag<OutputT> outputTag) {
-      return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag);
-    }
-
-    @Override
-    public Object[] createAccumulator(K key, Context c) {
-      Object[] accumsArray = new Object[combineFnCount];
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c);
-      }
-      return accumsArray;
-    }
-
-    @Override
-    public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        Object input = extractInputFns.get(i).apply(value);
-        accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) {
-      Iterator<Object[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(key, c);
-      } else {
-        // Reuses the first accumulator, and overwrites its values.
-        // It is safe because {@code accum[i]} only depends on
-        // the i-th component of each accumulator.
-        Object[] accum = iter.next();
-        for (int i = 0; i < combineFnCount; ++i) {
-          accum[i] = keyedCombineFns.get(i).mergeAccumulators(
-              key, new ProjectionIterable(accumulators, i), c);
-        }
-        return accum;
-      }
-    }
-
-    @Override
-    public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) {
-      Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap();
-      for (int i = 0; i < combineFnCount; ++i) {
-        valuesMap.put(
-            outputTags.get(i),
-            keyedCombineFns.get(i).extractOutput(key, accumulator[i], c));
-      }
-      return new CoCombineResult(valuesMap);
-    }
-
-    @Override
-    public Object[] compact(K key, Object[] accumulator, Context c) {
-      for (int i = 0; i < combineFnCount; ++i) {
-        accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c);
-      }
-      return accumulator;
-    }
-
-    @Override
-    public Coder<Object[]> getAccumulatorCoder(
-        CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder)
-        throws CannotProvideCoderException {
-      List<Coder<Object>> coders = Lists.newArrayList();
-      for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
-        coders.add(keyedCombineFns.get(i).getAccumulatorCoder(
-            registry, keyCoder, inputCoder));
-      }
-      return new ComposedAccumulatorCoder(coders);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      CombineFns.populateDisplayData(builder, keyedCombineFns);
-    }
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private static class ProjectionIterable implements Iterable<Object> {

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index cd0600a..9ae19f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -17,20 +17,15 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * This class contains combine functions that have access to {@code PipelineOptions} and side inputs
  * through {@code CombineWithContext.Context}.
  *
- * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend.
+ * <p>{@link CombineFnWithContext} is for users to extend.
  */
 public class CombineWithContext {
 
@@ -116,170 +111,23 @@ public class CombineWithContext {
       return accumulator;
     }
 
-    @Override
-    public OutputT defaultValue() {
-      throw new UnsupportedOperationException(
-          "Override this function to provide the default value.");
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() {
-      // The key, an object, is never even looked at.
-      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key, Context c) {
-          return CombineFnWithContext.this.createAccumulator(c);
-        }
-
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) {
-          return CombineFnWithContext.this.addInput(accumulator, input, c);
-        }
-
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-          return CombineFnWithContext.this.mergeAccumulators(accumulators, c);
-        }
-
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-          return CombineFnWithContext.this.extractOutput(accumulator, c);
-        }
-
-        @Override
-        public AccumT compact(K key, AccumT accumulator, Context c) {
-          return CombineFnWithContext.this.compact(accumulator, c);
-        }
-
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder);
-        }
-
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder);
-        }
-
-        @Override
-        public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
-          return CombineFnWithContext.this;
-        }
-
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(CombineFnWithContext.this);
-        }
-      };
-    }
-  }
-
-  /**
-   * A keyed combine function that has access to {@code PipelineOptions} and side inputs through
-   * {@code CombineWithContext.Context}.
-   *
-   * <p>See the equivalent {@link KeyedCombineFn} for details about keyed combine functions.
-   */
-  public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
-      extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT>
-      implements RequiresContextInternal {
-    /**
-     * Returns a new, mutable accumulator value representing the accumulation of zero input values.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator},
-     * but it has additional access to {@code CombineWithContext.Context}.
-     */
-    public abstract AccumT createAccumulator(K key, Context c);
-
     /**
-     * Adds the given input value to the given accumulator, returning the new accumulator value.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
+     * Applies this {@code CombineFnWithContext} to a collection of input values to produce a
+     * combined output value.
      */
-    public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c);
-
-    /**
-     * Returns an accumulator representing the accumulation of all the
-     * input values accumulated in the merging accumulators.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators},
-     * but it has additional access to {@code CombineWithContext.Context}..
-     */
-    public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c);
-
-    /**
-     * Returns the output value that is the result of combining all
-     * the input values represented by the given accumulator.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public abstract OutputT extractOutput(K key, AccumT accumulator, Context c);
-
-    /**
-     * Returns an accumulator that represents the same logical value as the
-     * input accumulator, but may have a more compact representation.
-     *
-     * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to
-     * {@code CombineWithContext.Context}.
-     */
-    public AccumT compact(K key, AccumT accumulator, Context c) {
-      return accumulator;
-    }
-
-    /**
-     * Applies this {@code KeyedCombineFnWithContext} to a key and a collection
-     * of input values to produce a combined output value.
-     */
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) {
-      AccumT accum = createAccumulator(key, c);
+    public OutputT apply(Iterable<? extends InputT> inputs, Context c) {
+      AccumT accum = createAccumulator(c);
       for (InputT input : inputs) {
-        accum = addInput(key, accum, input, c);
+        accum = addInput(accum, input, c);
       }
-      return extractOutput(key, accum, c);
+      return extractOutput(accum, c);
     }
 
     @Override
-    public CombineFnWithContext<InputT, AccumT, OutputT> forKey(
-        final K key, final Coder<K> keyCoder) {
-      return new CombineFnWithContext<InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(Context c) {
-          return KeyedCombineFnWithContext.this.createAccumulator(key, c);
-        }
-
-        @Override
-        public AccumT addInput(AccumT accumulator, InputT input, Context c) {
-          return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c);
-        }
-
-        @Override
-        public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) {
-          return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c);
-        }
-
-        @Override
-        public OutputT extractOutput(AccumT accumulator, Context c) {
-          return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c);
-        }
-
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return KeyedCombineFnWithContext.this.getDefaultOutputCoder(
-              registry, keyCoder, inputCoder);
-        }
-      };
+    public OutputT defaultValue() {
+      throw new UnsupportedOperationException(
+          "Override this function to provide the default value.");
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 47be9b9..e42c0b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -234,7 +234,7 @@ public class Top {
   public static <K, V, ComparatorT extends Comparator<V> & Serializable>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       perKey(int count, ComparatorT compareFn) {
-    return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
+    return Combine.perKey(new TopCombineFn<>(count, compareFn));
   }
 
   /**
@@ -280,7 +280,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       smallestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
+    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()));
   }
 
   /**
@@ -326,7 +326,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PerKey<K, V, List<V>>
       largestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
+    return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 0495ad6..b3b8918 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -205,7 +205,7 @@ public class View {
    * PCollection<KV<K, V>> input = ...
    * CombineFn<V, OutputT> yourCombineFn = ...
    * PCollectionView<Map<K, OutputT>> output = input
-   *     .apply(Combine.perKey(yourCombineFn.<K>asKeyedFn()))
+   *     .apply(Combine.perKey(yourCombineFn))
    *     .apply(View.<K, OutputT>asMap());
    * }</pre>
    *