You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/17 17:53:10 UTC

[4/7] beam git commit: Moves PerKeyCombineFnRunners to Flink runner

Moves PerKeyCombineFnRunners to Flink runner

Flink is its only user. This removes the only remaining
mentions of OldDoFn in the SDK that are not OldDoFn itself.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e382c401
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e382c401
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e382c401

Branch: refs/heads/master
Commit: e382c40187754ad4f3c20565675cb3f131528070
Parents: 2b26ec8
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jan 12 13:17:11 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunner.java     |  25 --
 .../runners/core/PerKeyCombineFnRunners.java    | 262 -------------------
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 +++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../beam/sdk/util/CombineContextFactory.java    |  18 --
 9 files changed, 244 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
index a927ecd..4550273 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
@@ -75,31 +75,6 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Seria
    */
   OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
 
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output
-   * in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 34d711b..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.apply(key, inputs);
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      AccumT accum = keyedCombineFn.createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = keyedCombineFn.addInput(key, accum, input);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.apply(key, inputs,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
-      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
-      for (InputT input : inputs) {
-        accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..f672578
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+    }
+  }
+
+  /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
+  private static CombineWithContext.Context createFromProcessContext(
+      final OldDoFn<?, ?>.ProcessContext c) {
+    return new CombineWithContext.Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+    };
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFn;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFnWithContext;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, value,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.mergeAccumulators(
+          key, accumulators, createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, input,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 041d0e8..6412e63 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index fef7921..1456eea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 59163e9..2f56fac 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 8b6ec3a..627cfa6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index fb5c90c..de0d416 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index 149d276..a983057 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -49,23 +48,6 @@ public class CombineContextFactory {
   }
 
   /**
-   * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}.
-   */
-  public static Context createFromProcessContext(final OldDoFn<?, ?>.ProcessContext c) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
    * Returns a {@code Combine.Context} that wraps a {@link StateContext}.
    */
   public static Context createFromStateContext(final StateContext<?> c) {