You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/30 22:36:19 UTC
[1/2] incubator-beam git commit: Move PerKeyCombineFnRunners to
runners/core.
Repository: incubator-beam
Updated Branches:
refs/heads/master 4a7da91f0 -> 4b682039d
Move PerKeyCombineFnRunners to runners/core.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68a2025f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68a2025f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68a2025f
Branch: refs/heads/master
Commit: 68a2025f988a0131c6fc21649a3dbd4f71c15688
Parents: 4a7da91
Author: Pei He <pe...@google.com>
Authored: Mon Nov 14 14:15:35 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 14:25:14 2016 -0800
----------------------------------------------------------------------
.../runners/core/PerKeyCombineFnRunners.java | 263 +++++++++++++++++++
.../FlinkMergingNonShuffleReduceFunction.java | 2 +-
.../FlinkMergingPartialReduceFunction.java | 2 +-
.../functions/FlinkMergingReduceFunction.java | 2 +-
.../functions/FlinkPartialReduceFunction.java | 2 +-
.../functions/FlinkReduceFunction.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 37 ++-
.../beam/sdk/util/PerKeyCombineFnRunners.java | 258 ------------------
8 files changed, 297 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..6f0ff96
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+ /**
+ * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+ */
+ public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+ create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+ if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+ return new KeyedCombineFnWithContextRunner<>(
+ (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+ return new KeyedCombineFnRunner<>(
+ (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+ }
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+ */
+ private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+ implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+ private KeyedCombineFnRunner(
+ KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+ this.keyedCombineFn = keyedCombineFn;
+ }
+
+ @Override
+ public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
+ return keyedCombineFn;
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.createAccumulator(key);
+ }
+
+ @Override
+ public AccumT addInput(
+ K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.addInput(key, accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.mergeAccumulators(key, accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.extractOutput(key, accumulator);
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.compact(key, accumulator);
+ }
+
+ @Override
+ public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.apply(key, inputs);
+ }
+
+ @Override
+ public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+ AccumT accum = keyedCombineFn.createAccumulator(key);
+ for (InputT input : inputs) {
+ accum = keyedCombineFn.addInput(key, accum, input);
+ }
+ return accum;
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFn.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.createAccumulator(key);
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.addInput(key, accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.mergeAccumulators(key, accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.extractOutput(key, accumulator);
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.compact(key, accumulator);
+ }
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+ */
+ private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+ implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+ private KeyedCombineFnWithContextRunner(
+ KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+ this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+ }
+
+ @Override
+ public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
+ return keyedCombineFnWithContext;
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.createAccumulator(key,
+ CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT addInput(
+ K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.addInput(key, accumulator, value,
+ CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.mergeAccumulators(
+ key, accumulators, CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.extractOutput(key, accumulator,
+ CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.compact(key, accumulator,
+ CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.apply(key, inputs,
+ CombineContextFactory.createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
+ CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
+ AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
+ for (InputT input : inputs) {
+ accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
+ }
+ return accum;
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFnWithContext.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.createAccumulator(key,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.addInput(key, accumulator, input,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.extractOutput(key, accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.compact(key, accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index a4284f8..3db98a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index 30d3326..ea0669a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 29dc1e3..9a4aadc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 3ea456a..ca80461 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index ab0c471..b4d003c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -34,7 +35,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index ac8acfc..be063e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -57,8 +58,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -1940,7 +1939,7 @@ public class Combine {
// on that does addInput + merge and another that does merge + extract.
PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine;
PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine;
- if (!(typedFn instanceof RequiresContextInternal)) {
+ if (typedFn instanceof KeyedCombineFn) {
final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn =
(KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
@@ -2027,7 +2026,7 @@ public class Combine {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
- } else {
+ } else if (typedFn instanceof KeyedCombineFnWithContext) {
final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext =
(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn;
hotPreCombine =
@@ -2120,6 +2119,9 @@ public class Combine {
builder.delegate(PerKeyWithHotKeyFanout.this);
}
};
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", typedFn.getClass()));
}
// Use the provided hotKeyFanout fn to split into "hot" and "cold" keys,
@@ -2389,15 +2391,34 @@ public class Combine {
public PCollection<KV<K, OutputT>> apply(
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
- final PerKeyCombineFnRunner<? super K, ? super InputT, ?, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(fn);
PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
@Override
- public void processElement(ProcessContext c) {
+ public void processElement(final ProcessContext c) {
K key = c.element().getKey();
- c.output(KV.of(key, combineFnRunner.apply(key, c.element().getValue(), c)));
+ OutputT output;
+ if (fn instanceof KeyedCombineFnWithContext) {
+ output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn)
+ .apply(key, c.element().getValue(), new CombineWithContext.Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return c.sideInput(view);
+ }
+ });
+ } else if (fn instanceof KeyedCombineFn) {
+ output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn)
+ .apply(key, c.element().getValue());
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", fn.getClass()));
+ }
+ c.output(KV.of(key, output));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68a2025f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
deleted file mode 100644
index 35d0f2d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
- /**
- * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
- create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof RequiresContextInternal) {
- return new KeyedCombineFnWithContextRunner<>(
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else {
- return new KeyedCombineFnRunner<>(
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFn}.
- */
- private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- private KeyedCombineFnRunner(
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
- }
-
- @Override
- public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFn;
- }
-
- @Override
- public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.compact(key, accumulator);
- }
-
- @Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.apply(key, inputs);
- }
-
- @Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
- AccumT accum = keyedCombineFn.createAccumulator(key);
- for (InputT input : inputs) {
- accum = keyedCombineFn.addInput(key, accum, input);
- }
- return accum;
- }
-
- @Override
- public String toString() {
- return keyedCombineFn.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.compact(key, accumulator);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
- */
- private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
- private KeyedCombineFnWithContextRunner(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
- this.keyedCombineFnWithContext = keyedCombineFnWithContext;
- }
-
- @Override
- public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFnWithContext;
- }
-
- @Override
- public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.addInput(key, accumulator, value,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.mergeAccumulators(
- key, accumulators, CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.apply(key, inputs,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
- CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
- AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
- for (InputT input : inputs) {
- accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
- }
- return accum;
- }
-
- @Override
- public String toString() {
- return keyedCombineFnWithContext.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
- Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.addInput(key, accumulator, input,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
- }
-}
[2/2] incubator-beam git commit: This closes #1388
Posted by ke...@apache.org.
This closes #1388
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b682039
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b682039
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b682039
Branch: refs/heads/master
Commit: 4b682039d1286861bb2658401913eddf8bdbfb4a
Parents: 4a7da91 68a2025
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 14:25:31 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 14:25:31 2016 -0800
----------------------------------------------------------------------
.../runners/core/PerKeyCombineFnRunners.java | 263 +++++++++++++++++++
.../FlinkMergingNonShuffleReduceFunction.java | 2 +-
.../FlinkMergingPartialReduceFunction.java | 2 +-
.../functions/FlinkMergingReduceFunction.java | 2 +-
.../functions/FlinkPartialReduceFunction.java | 2 +-
.../functions/FlinkReduceFunction.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 37 ++-
.../beam/sdk/util/PerKeyCombineFnRunners.java | 258 ------------------
8 files changed, 297 insertions(+), 271 deletions(-)
----------------------------------------------------------------------