You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/30 22:36:19 UTC

[1/2] incubator-beam git commit: Move PerKeyCombineFnRunners to runners/core.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4a7da91f0 -> 4b682039d


Move PerKeyCombineFnRunners to runners/core.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68a2025f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68a2025f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68a2025f

Branch: refs/heads/master
Commit: 68a2025f988a0131c6fc21649a3dbd4f71c15688
Parents: 4a7da91
Author: Pei He <pe...@google.com>
Authored: Mon Nov 14 14:15:35 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 14:25:14 2016 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunners.java    | 263 +++++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  37 ++-
 .../beam/sdk/util/PerKeyCombineFnRunners.java   | 258 ------------------
 8 files changed, 297 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..6f0ff96
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFn;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+
+    @Override
+    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.apply(key, inputs);
+    }
+
+    @Override
+    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+      AccumT accum = keyedCombineFn.createAccumulator(key);
+      for (InputT input : inputs) {
+        accum = keyedCombineFn.addInput(key, accum, input);
+      }
+      return accum;
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFnWithContext;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+          CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, value,
+          CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.mergeAccumulators(
+          key, accumulators, CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+          CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+          CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.apply(key, inputs,
+          CombineContextFactory.createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+      CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
+      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
+      for (InputT input : inputs) {
+        accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
+      }
+      return accum;
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, input,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index a4284f8..3db98a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index 30d3326..ea0669a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 29dc1e3..9a4aadc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 3ea456a..ca80461 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index ab0c471..b4d003c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -34,7 +35,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index ac8acfc..be063e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -57,8 +58,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -1940,7 +1939,7 @@ public class Combine {
       // on that does addInput + merge and another that does merge + extract.
       PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine;
       PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
-      if (!(typedFn instanceof RequiresContextInternal)) {
+      if (typedFn instanceof KeyedCombineFn) {
         final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn =
             (KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn;
         hotPreCombine =
@@ -2027,7 +2026,7 @@ public class Combine {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
-      } else {
+      } else if (typedFn instanceof KeyedCombineFnWithContext) {
         final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
             (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn;
         hotPreCombine =
@@ -2120,6 +2119,9 @@ public class Combine {
                 builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
+      } else {
+        throw new IllegalStateException(
+            String.format("Unknown type of CombineFn: %s", typedFn.getClass()));
       }
 
       // Use the provided hotKeyFanout fn to split into "hot" and "cold" keys,
@@ -2389,15 +2391,34 @@ public class Combine {
     public PCollection<KV<K, OutputT>> apply(
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
 
-      final PerKeyCombineFnRunner<? super K, ? super InputT, ?, OutputT> combineFnRunner =
-          PerKeyCombineFnRunners.create(fn);
       PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
           new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
             @Override
-            public void processElement(ProcessContext c) {
+            public void processElement(final ProcessContext c) {
               K key = c.element().getKey();
 
-              c.output(KV.of(key, combineFnRunner.apply(key, c.element().getValue(), c)));
+              OutputT output;
+              if (fn instanceof KeyedCombineFnWithContext) {
+                output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn)
+                    .apply(key, c.element().getValue(), new CombineWithContext.Context() {
+                      @Override
+                      public PipelineOptions getPipelineOptions() {
+                        return c.getPipelineOptions();
+                      }
+
+                      @Override
+                      public <T> T sideInput(PCollectionView<T> view) {
+                        return c.sideInput(view);
+                      }
+                    });
+              } else if (fn instanceof KeyedCombineFn) {
+                output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn)
+                    .apply(key, c.element().getValue());
+              } else {
+                throw new IllegalStateException(
+                    String.format("Unknown type of CombineFn: %s", fn.getClass()));
+              }
+              c.output(KV.of(key, output));
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
deleted file mode 100644
index 35d0f2d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof RequiresContextInternal) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.apply(key, inputs);
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      AccumT accum = keyedCombineFn.createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = keyedCombineFn.addInput(key, accum, input);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.apply(key, inputs,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
-      CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
-      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
-      for (InputT input : inputs) {
-        accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}


[2/2] incubator-beam git commit: This closes #1388

Posted by ke...@apache.org.
This closes #1388


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b682039
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b682039
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b682039

Branch: refs/heads/master
Commit: 4b682039d1286861bb2658401913eddf8bdbfb4a
Parents: 4a7da91 68a2025
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 14:25:31 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 14:25:31 2016 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunners.java    | 263 +++++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  37 ++-
 .../beam/sdk/util/PerKeyCombineFnRunners.java   | 258 ------------------
 8 files changed, 297 insertions(+), 271 deletions(-)
----------------------------------------------------------------------