You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/19 23:21:15 UTC

[1/2] beam git commit: Moves PerKeyCombineFnRunners back to runners-core

Repository: beam
Updated Branches:
  refs/heads/master 420367088 -> 2f580caff


Moves PerKeyCombineFnRunners back to runners-core

It is used by the Dataflow worker.
However, this change moves only the non-OldDoFn related part
to runners-core. The OldDoFn-related part stays in Flink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a3292fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a3292fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a3292fb

Branch: refs/heads/master
Commit: 1a3292fb4c57efdc6f98bf58cff9a0b42494574b
Parents: 603f4fb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jan 18 16:26:00 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 19 11:43:25 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunner.java     |  45 ----
 .../runners/core/PerKeyCombineFnRunners.java    | 161 +++++++++++++
 .../runners/flink/OldPerKeyCombineFnRunner.java |  62 +++++
 .../flink/OldPerKeyCombineFnRunners.java        | 155 ++++++++++++
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 -------------------
 .../FlinkMergingNonShuffleReduceFunction.java   |   8 +-
 .../FlinkMergingPartialReduceFunction.java      |   8 +-
 .../functions/FlinkMergingReduceFunction.java   |   8 +-
 .../functions/FlinkPartialReduceFunction.java   |   8 +-
 .../functions/FlinkReduceFunction.java          |   8 +-
 10 files changed, 398 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
index 4550273..a6608a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SideInputReader;
 
@@ -34,50 +33,6 @@ import org.apache.beam.sdk.util.SideInputReader;
  */
 public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
   /**
-   * Returns the {@link PerKeyCombineFn} it holds.
-   *
-   * <p>It can be a {@code KeyedCombineFn} or a {@code KeyedCombineFnWithContext}.
-   */
-  PerKeyCombineFn<K, InputT, AccumT, OutputT> fn();
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT mergeAccumulators(
-      K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
    * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
    *
    * <p>It constructs a {@code CombineWithContext.Context} from

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..7736758
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, input,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
new file mode 100644
index 0000000..5d676dc
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+
+/**
+ * An interface that runs a {@link PerKeyCombineFn} with unified APIs using
+ * {@link OldDoFn.ProcessContext}.
+ */
+@Deprecated
+public interface OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
+  /**
+   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+   * if it is required.
+   */
+  AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
+
+  /**
+   * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+   * if it is required.
+   */
+  AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
+
+  /**
+   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+   * if it is required.
+   */
+  AccumT mergeAccumulators(
+      K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
+
+  /**
+   * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
+   * if it is required.
+   */
+  OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
new file mode 100644
index 0000000..8ebeadf
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Static utility methods that provide {@link OldPerKeyCombineFnRunner} implementations
+ * for different keyed combine functions.
+ */
+@Deprecated
+public class OldPerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
+    }
+  }
+
+  /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
+  private static CombineWithContext.Context createFromProcessContext(
+      final OldDoFn<?, ?>.ProcessContext c) {
+    return new CombineWithContext.Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+    };
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
+      implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, value,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.mergeAccumulators(
+          key, accumulators, createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
deleted file mode 100644
index f672578..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
-    }
-  }
-
-  /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
-  private static CombineWithContext.Context createFromProcessContext(
-      final OldDoFn<?, ?>.ProcessContext c) {
-    return new CombineWithContext.Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 6412e63..1b43172 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -101,8 +101,8 @@ public class FlinkMergingNonShuffleReduceFunction<
             sideInputs, out
         );
 
-    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
+        OldPerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index 1456eea..cf058e8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -69,8 +69,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
             sideInputs, out
         );
 
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        OldPerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 2f56fac..4fa4578 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -26,8 +26,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -71,8 +71,8 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
             sideInputs, out
         );
 
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        OldPerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 627cfa6..f5a9087 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -97,8 +97,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
             sideInputs, out
         );
 
-    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        OldPerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/1a3292fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index de0d416..a3fa0d4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,8 +26,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
+import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -101,8 +101,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
             sideInputs, out
         );
 
-    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        PerKeyCombineFnRunners.create(combineFn);
+    OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        OldPerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =


[2/2] beam git commit: This closes #1798: Moves PerKeyCombineFnRunners back to runners-core

Posted by ke...@apache.org.
This closes #1798: Moves PerKeyCombineFnRunners back to runners-core


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f580caf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f580caf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f580caf

Branch: refs/heads/master
Commit: 2f580caff3cca9aa00a3ba36aaebd02feaa87f1c
Parents: 4203670 1a3292f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 19 14:26:56 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jan 19 14:26:56 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunner.java     |  45 ----
 .../runners/core/PerKeyCombineFnRunners.java    | 161 +++++++++++++
 .../runners/flink/OldPerKeyCombineFnRunner.java |  62 +++++
 .../flink/OldPerKeyCombineFnRunners.java        | 155 ++++++++++++
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 -------------------
 .../FlinkMergingNonShuffleReduceFunction.java   |   8 +-
 .../FlinkMergingPartialReduceFunction.java      |   8 +-
 .../functions/FlinkMergingReduceFunction.java   |   8 +-
 .../functions/FlinkPartialReduceFunction.java   |   8 +-
 .../functions/FlinkReduceFunction.java          |   8 +-
 10 files changed, 398 insertions(+), 304 deletions(-)
----------------------------------------------------------------------