You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/03 16:39:32 UTC

[1/2] incubator-beam git commit: [BEAM-79] Port Gearpump runner from OldDoFn to new DoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/gearpump-runner 3933b5577 -> 323ec1188


[BEAM-79] Port Gearpump runner from OldDoFn to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45570b9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45570b9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45570b9c

Branch: refs/heads/gearpump-runner
Commit: 45570b9c7ebb11080deca3346fc601c69796612a
Parents: 3933b55
Author: manuzhang <ow...@gmail.com>
Authored: Mon Oct 31 11:52:22 2016 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 09:38:41 2016 -0700

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineTranslator.java    |   2 +-
 .../translators/ParDoBoundMultiTranslator.java  |  17 +-
 .../translators/ParDoBoundTranslator.java       |   3 +-
 .../translators/functions/DoFnFunction.java     |  19 +-
 .../translators/utils/DoFnRunnerFactory.java    |  77 +++
 .../translators/utils/GearpumpDoFnRunner.java   | 516 -------------------
 .../utils/NoOpAggregatorFactory.java            |  41 ++
 7 files changed, 143 insertions(+), 532 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 5045ae4..8588fff 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -108,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
 
   @Override
   public void visitValue(PValue value, TransformTreeNode producer) {
-    LOG.info("visiting value {}", value);
+    LOG.debug("visiting value {}", value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 2b49684..54f1c3f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -27,11 +27,11 @@ import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
         new DoFnMultiFunction<>(
             context.getPipelineOptions(),
-            transform.getFn(),
+            transform.getNewFn(),
             transform.getMainOutputTag(),
             transform.getSideOutputTags(),
             inputT.getWindowingStrategy(),
@@ -87,18 +87,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
       FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
       DoFnRunners.OutputManager {
 
-    private final DoFnRunner<InputT, OutputT> doFnRunner;
+    private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
+    private DoFnRunner<InputT, OutputT> doFnRunner;
     private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
         .newArrayList();
 
     public DoFnMultiFunction(
         GearpumpPipelineOptions pipelineOptions,
-        OldDoFn<InputT, OutputT> doFn,
+        DoFn<InputT, OutputT> doFn,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList sideOutputTags,
         WindowingStrategy<?, ?> windowingStrategy,
         SideInputReader sideInputReader) {
-      this.doFnRunner = new GearpumpDoFnRunner<>(
+      this.doFnRunnerFactory = new DoFnRunnerFactory<>(
           pipelineOptions,
           doFn,
           sideInputReader,
@@ -106,12 +107,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
           mainOutputTag,
           sideOutputTags.getAll(),
           new NoOpStepContext(),
+          new NoOpAggregatorFactory(),
           windowingStrategy
       );
     }
 
     @Override
     public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+      if (null == doFnRunner) {
+        doFnRunner = doFnRunnerFactory.createRunner();
+      }
       doFnRunner.startBundle();
       doFnRunner.processElement(wv);
       doFnRunner.finishBundle();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index b97cbb4..a796c83 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.gearpump.translators;
 import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -39,7 +38,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
     PCollection<OutputT> output = context.getOutput(transform);
     WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 8d16356..42969fe 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -26,10 +26,10 @@ import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,17 +44,17 @@ import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 public class DoFnFunction<InputT, OutputT> implements
     FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
 
-  private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {
-  };
-  private final DoFnRunner<InputT, OutputT> doFnRunner;
+  private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
   private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+  private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
+  private DoFnRunner<InputT, OutputT> doFnRunner;
 
   public DoFnFunction(
       GearpumpPipelineOptions pipelineOptions,
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       SideInputReader sideInputReader) {
-    this.doFnRunner = new GearpumpDoFnRunner<>(
+    this.doFnRunnerFactory = new DoFnRunnerFactory<>(
         pipelineOptions,
         doFn,
         sideInputReader,
@@ -62,6 +62,7 @@ public class DoFnFunction<InputT, OutputT> implements
         mainTag,
         TupleTagList.empty().getAll(),
         new NoOpStepContext(),
+        new NoOpAggregatorFactory(),
         windowingStrategy
     );
   }
@@ -70,6 +71,10 @@ public class DoFnFunction<InputT, OutputT> implements
   public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
     outputs = Lists.newArrayList();
 
+    if (null == doFnRunner) {
+      doFnRunner = doFnRunnerFactory.createRunner();
+    }
+
     doFnRunner.startBundle();
     doFnRunner.processElement(value);
     doFnRunner.finishBundle();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
new file mode 100644
index 0000000..7119a87
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * a serializable {@link SimpleDoFnRunner}.
+ */
+public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
+
+  private final DoFn<InputT, OutputT> fn;
+  private final transient PipelineOptions options;
+  private final SideInputReader sideInputReader;
+  private final DoFnRunners.OutputManager outputManager;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> sideOutputTags;
+  private final ExecutionContext.StepContext stepContext;
+  private final AggregatorFactory aggregatorFactory;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+
+  public DoFnRunnerFactory(
+      GearpumpPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      SideInputReader sideInputReader,
+      DoFnRunners.OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      ExecutionContext.StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    this.fn = doFn;
+    this.options = pipelineOptions;
+    this.sideInputReader = sideInputReader;
+    this.outputManager = outputManager;
+    this.mainOutputTag = mainOutputTag;
+    this.sideOutputTags = sideOutputTags;
+    this.stepContext = stepContext;
+    this.aggregatorFactory = aggregatorFactory;
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  public DoFnRunner<InputT, OutputT> createRunner() {
+    return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag,
+        sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
deleted file mode 100644
index ec86a8d..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import static org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.joda.time.Instant;
-
-
-/**
- * a serializable {@link SimpleDoFnRunner}.
- */
-public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>,
-    Serializable {
-
-  private final OldDoFn<InputT, OutputT> fn;
-  private final transient PipelineOptions options;
-  private final SideInputReader sideInputReader;
-  private final DoFnRunners.OutputManager outputManager;
-  private final TupleTag<OutputT> mainOutputTag;
-  private final List<TupleTag<?>> sideOutputTags;
-  private final ExecutionContext.StepContext stepContext;
-  private final WindowFn<?, ?> windowFn;
-  private DoFnContext<InputT, OutputT> context;
-
-  public GearpumpDoFnRunner(
-      GearpumpPipelineOptions pipelineOptions,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      DoFnRunners.OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      ExecutionContext.StepContext stepContext,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = doFn;
-    this.options = pipelineOptions;
-    this.sideInputReader = sideInputReader;
-    this.outputManager = outputManager;
-    this.mainOutputTag = mainOutputTag;
-    this.sideOutputTags = sideOutputTags;
-    this.stepContext = stepContext;
-    this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn();
-  }
-
-  @Override
-  public void startBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      if (null == context) {
-        this.context = new DoFnContext<>(
-            options,
-            fn,
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            stepContext,
-            windowFn
-        );
-      }
-      fn.startBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    if (elem.getWindows().size() <= 1
-        || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
-        && context.sideInputReader.isEmpty())) {
-      invokeProcessElement(elem);
-    } else {
-      // We could modify the windowed value (and the processContext) to
-      // avoid repeated allocations, but this is more straightforward.
-      for (BoundedWindow window : elem.getWindows()) {
-        invokeProcessElement(WindowedValue.of(
-            elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
-      }
-    }
-  }
-
-  @Override
-  public void finishBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.finishBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  private void invokeProcessElement(WindowedValue<InputT> elem) {
-    final OldDoFn<InputT, OutputT>.ProcessContext processContext =
-        new DoFnProcessContext<>(fn, context, elem);
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.processElement(processContext);
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
-    }
-  }
-
-  private RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
-  }
-
-  private boolean isSystemDoFn() {
-    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-  }
-
-  /**
-   * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
-   *
-   * @param <InputT>  the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.Context {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final transient PipelineOptions options;
-    final OldDoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final DoFnRunners.OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final ExecutionContext.StepContext stepContext;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we can throw an
-     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private final Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(PipelineOptions options,
-        OldDoFn<InputT, OutputT> fn,
-        SideInputReader sideInputReader,
-        DoFnRunners.OutputManager outputManager,
-        TupleTag<OutputT> mainOutputTag,
-        List<TupleTag<?>> sideOutputTags,
-        ExecutionContext.StepContext stepContext,
-        WindowFn<?, ?> windowFn) {
-      fn.super();
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> sideOutputTag : sideOutputTags) {
-        outputTags.add(sideOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.windowFn = windowFn;
-      super.setupDelegateAggregators();
-    }
-
-    //////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
-            @Override
-            public Object element() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input element when none was available");
-            }
-
-            @Override
-            public Instant timestamp() {
-              if (inputTimestamp == null) {
-                throw new UnsupportedOperationException(
-                    "WindowFn attempted to access input timestamp when none was available");
-              }
-              return inputTimestamp;
-            }
-
-            @Override
-            public BoundedWindow window() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input windows when none were available");
-            }
-          });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
-        T output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteSideOutput(tag, windowedElem);
-      }
-    }
-
-    // Following implementations of output, outputWithTimestamp, and sideOutput
-    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in DoFn.processElement.
-    @Override
-    public void output(OutputT output) {
-      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
-      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
-      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner,
-          "Combiner passed to createAggregator cannot be null");
-      throw new UnsupportedOperationException("aggregator not supported in Gearpump runner");
-    }
-  }
-
-
-  /**
-   * A concrete implementation of {@code DoFn.ProcessContext} used for
-   * running a {@link DoFn} over a single element.
-   *
-   * @param <InputT>  the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
-   */
-  private static class DoFnProcessContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
-    final OldDoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-
-    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
-        DoFnContext<InputT, OutputT> context,
-        WindowedValue<InputT> windowedValue) {
-      fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public InputT element() {
-      return windowedValue.getValue();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple windows");
-        }
-      }
-      return context.sideInput(view, window);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a DoFn marked as RequiresWindow.");
-      }
-      return Iterables.getOnlyElement(windows());
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return windowedValue.getPane();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWindowedValue(output, timestamp,
-          windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
-      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
-      context.sideOutputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public Instant timestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public void outputWindowedValue(OutputT output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          context.outputWindowedValue(output, timestamp, windows, pane);
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(
-            TupleTag<?> tag,
-            Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
-          @SuppressWarnings("unchecked")
-          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
-          context.stepContext.writePCollectionViewData(
-              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
-              window(), windowCoder);
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return context.stepContext.stateInternals();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
-        }
-      };
-    }
-
-    @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
-    createAggregatorInternal(
-        String name, Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
new file mode 100644
index 0000000..cd404a5
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.util.ExecutionContext;
+
+/**
+ * no-op aggregator factory.
+ */
+public class NoOpAggregatorFactory implements AggregatorFactory, Serializable {
+
+  @Override
+  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+      Class<?> fnClass,
+      ExecutionContext.StepContext stepContext,
+      String aggregatorName,
+      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+    return null;
+  }
+}


[2/2] incubator-beam git commit: This closes #1234

Posted by ke...@apache.org.
This closes #1234


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/323ec118
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/323ec118
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/323ec118

Branch: refs/heads/gearpump-runner
Commit: 323ec1188d2dffcdad640701e1827f90965994a8
Parents: 3933b55 45570b9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 09:39:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 09:39:17 2016 -0700

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineTranslator.java    |   2 +-
 .../translators/ParDoBoundMultiTranslator.java  |  17 +-
 .../translators/ParDoBoundTranslator.java       |   3 +-
 .../translators/functions/DoFnFunction.java     |  19 +-
 .../translators/utils/DoFnRunnerFactory.java    |  77 +++
 .../translators/utils/GearpumpDoFnRunner.java   | 516 -------------------
 .../utils/NoOpAggregatorFactory.java            |  41 ++
 7 files changed, 143 insertions(+), 532 deletions(-)
----------------------------------------------------------------------