You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/19 23:21:15 UTC
[1/2] beam git commit: Moves PerKeyCombineFnRunners back to
runners-core
Repository: beam
Updated Branches:
refs/heads/master 420367088 -> 2f580caff
Moves PerKeyCombineFnRunners back to runners-core
It is used by the Dataflow worker.
However, this change moves only the non-OldDoFn related part
to runners-core. The OldDoFn-related part stays in Flink.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a3292fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a3292fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a3292fb
Branch: refs/heads/master
Commit: 1a3292fb4c57efdc6f98bf58cff9a0b42494574b
Parents: 603f4fb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jan 18 16:26:00 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 19 11:43:25 2017 -0800
----------------------------------------------------------------------
.../runners/core/PerKeyCombineFnRunner.java | 45 ----
.../runners/core/PerKeyCombineFnRunners.java | 161 +++++++++++++
.../runners/flink/OldPerKeyCombineFnRunner.java | 62 +++++
.../flink/OldPerKeyCombineFnRunners.java | 155 ++++++++++++
.../runners/flink/PerKeyCombineFnRunners.java | 239 -------------------
.../FlinkMergingNonShuffleReduceFunction.java | 8 +-
.../FlinkMergingPartialReduceFunction.java | 8 +-
.../functions/FlinkMergingReduceFunction.java | 8 +-
.../functions/FlinkPartialReduceFunction.java | 8 +-
.../functions/FlinkReduceFunction.java | 8 +-
10 files changed, 398 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
index 4550273..a6608a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import java.util.Collection;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SideInputReader;
@@ -34,50 +33,6 @@ import org.apache.beam.sdk.util.SideInputReader;
*/
public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
/**
- * Returns the {@link PerKeyCombineFn} it holds.
- *
- * <p>It can be a {@code KeyedCombineFn} or a {@code KeyedCombineFnWithContext}.
- */
- PerKeyCombineFn<K, InputT, AccumT, OutputT> fn();
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
- * if it is required.
- */
- AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
- * if it is required.
- */
- AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
- * if it is required.
- */
- AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
- * if it is required.
- */
- OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
* Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
*
* <p>It constructs a {@code CombineWithContext.Context} from
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..7736758
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+ /**
+ * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+ */
+ public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+ create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+ if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+ return new KeyedCombineFnWithContextRunner<>(
+ (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+ return new KeyedCombineFnRunner<>(
+ (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+ }
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+ */
+ private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+ implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+ private KeyedCombineFnRunner(
+ KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+ this.keyedCombineFn = keyedCombineFn;
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFn.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.createAccumulator(key);
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.addInput(key, accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.mergeAccumulators(key, accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.extractOutput(key, accumulator);
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFn.compact(key, accumulator);
+ }
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+ */
+ private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+ implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+ private KeyedCombineFnWithContextRunner(
+ KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+ this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFnWithContext.toString();
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+ Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.createAccumulator(key,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.addInput(key, accumulator, input,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.extractOutput(key, accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+
+ @Override
+ public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+ SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+ return keyedCombineFnWithContext.compact(key, accumulator,
+ CombineContextFactory.createFromComponents(
+ options, sideInputReader, Iterables.getOnlyElement(windows)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
new file mode 100644
index 0000000..5d676dc
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+
+/**
+ * An interface that runs a {@link PerKeyCombineFn} with unified APIs using
+ * {@link OldDoFn.ProcessContext}.
+ */
+@Deprecated
+public interface OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
+ /**
+ * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+ * if it is required.
+ */
+ AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
+
+ /**
+ * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+ * if it is required.
+ */
+ AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
+
+ /**
+ * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+ * if it is required.
+ */
+ AccumT mergeAccumulators(
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
+
+ /**
+ * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
+ *
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+ * if it is required.
+ */
+ OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
new file mode 100644
index 0000000..8ebeadf
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Static utility methods that provide {@link OldPerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+@Deprecated
+public class OldPerKeyCombineFnRunners {
+ /**
+ * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+ */
+ public static <K, InputT, AccumT, OutputT> OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+ create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+ if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+ return new KeyedCombineFnWithContextRunner<>(
+ (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+ return new KeyedCombineFnRunner<>(
+ (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+ }
+ }
+
+ /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
+ private static CombineWithContext.Context createFromProcessContext(
+ final OldDoFn<?, ?>.ProcessContext c) {
+ return new CombineWithContext.Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return c.sideInput(view);
+ }
+ };
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+ */
+ private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+ implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+ private KeyedCombineFnRunner(
+ KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+ this.keyedCombineFn = keyedCombineFn;
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.createAccumulator(key);
+ }
+
+ @Override
+ public AccumT addInput(
+ K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.addInput(key, accumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.mergeAccumulators(key, accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFn.extractOutput(key, accumulator);
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFn.toString();
+ }
+ }
+
+ /**
+ * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+ *
+ * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+ */
+ private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+ implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+ private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+ private KeyedCombineFnWithContextRunner(
+ KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+ this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+ }
+
+ @Override
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.createAccumulator(key,
+ createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT addInput(
+ K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.addInput(key, accumulator, value,
+ createFromProcessContext(c));
+ }
+
+ @Override
+ public AccumT mergeAccumulators(
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.mergeAccumulators(
+ key, accumulators, createFromProcessContext(c));
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+ return keyedCombineFnWithContext.extractOutput(key, accumulator,
+ createFromProcessContext(c));
+ }
+
+ @Override
+ public String toString() {
+ return keyedCombineFnWithContext.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
deleted file mode 100644
index f672578..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
- /**
- * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
- create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
- return new KeyedCombineFnWithContextRunner<>(
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else if (perKeyCombineFn instanceof KeyedCombineFn) {
- return new KeyedCombineFnRunner<>(
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else {
- throw new IllegalStateException(
- String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
- }
- }
-
- /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
- private static CombineWithContext.Context createFromProcessContext(
- final OldDoFn<?, ?>.ProcessContext c) {
- return new CombineWithContext.Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return c.sideInput(view);
- }
- };
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFn}.
- */
- private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- private KeyedCombineFnRunner(
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
- }
-
- @Override
- public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFn;
- }
-
- @Override
- public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public String toString() {
- return keyedCombineFn.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.compact(key, accumulator);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
- *
- * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
- */
- private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
- private KeyedCombineFnWithContextRunner(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
- this.keyedCombineFnWithContext = keyedCombineFnWithContext;
- }
-
- @Override
- public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFnWithContext;
- }
-
- @Override
- public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.createAccumulator(key,
- createFromProcessContext(c));
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.addInput(key, accumulator, value,
- createFromProcessContext(c));
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.mergeAccumulators(
- key, accumulators, createFromProcessContext(c));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- createFromProcessContext(c));
- }
-
- @Override
- public String toString() {
- return keyedCombineFnWithContext.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
- Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.addInput(key, accumulator, input,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 6412e63..1b43172 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -101,8 +101,8 @@ public class FlinkMergingNonShuffleReduceFunction<
sideInputs, out
);
- PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
+ OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
+ OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index 1456eea..cf058e8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -69,8 +69,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
sideInputs, out
);
- PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
+ OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+ OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 2f56fac..4fa4578 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -26,8 +26,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -71,8 +71,8 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
sideInputs, out
);
- PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
+ OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+ OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 627cfa6..f5a9087 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -97,8 +97,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
sideInputs, out
);
- PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
+ OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+ OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index de0d416..a3fa0d4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,8 +26,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -101,8 +101,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
sideInputs, out
);
- PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
+ OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+ OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
[2/2] beam git commit: This closes #1798: Moves
PerKeyCombineFnRunners back to runners-core
Posted by ke...@apache.org.
This closes #1798: Moves PerKeyCombineFnRunners back to runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f580caf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f580caf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f580caf
Branch: refs/heads/master
Commit: 2f580caff3cca9aa00a3ba36aaebd02feaa87f1c
Parents: 4203670 1a3292f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 19 14:26:56 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jan 19 14:26:56 2017 -0800
----------------------------------------------------------------------
.../runners/core/PerKeyCombineFnRunner.java | 45 ----
.../runners/core/PerKeyCombineFnRunners.java | 161 +++++++++++++
.../runners/flink/OldPerKeyCombineFnRunner.java | 62 +++++
.../flink/OldPerKeyCombineFnRunners.java | 155 ++++++++++++
.../runners/flink/PerKeyCombineFnRunners.java | 239 -------------------
.../FlinkMergingNonShuffleReduceFunction.java | 8 +-
.../FlinkMergingPartialReduceFunction.java | 8 +-
.../functions/FlinkMergingReduceFunction.java | 8 +-
.../functions/FlinkPartialReduceFunction.java | 8 +-
.../functions/FlinkReduceFunction.java | 8 +-
10 files changed, 398 insertions(+), 304 deletions(-)
----------------------------------------------------------------------