You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:56:03 UTC

[49/50] incubator-beam git commit: Rename DoFn to OldDoFn in Gearpump runner

Rename DoFn to OldDoFn in Gearpump runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc1b3549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc1b3549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc1b3549

Branch: refs/heads/gearpump-runner
Commit: bc1b354949416db3b52c4f37c66968bdb86f0813
Parents: 40be715
Author: manuzhang <ow...@gmail.com>
Authored: Fri Aug 12 07:22:00 2016 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 25 11:38:08 2016 -0700

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineResult.java        | 23 ++++++++++++++++++--
 .../gearpump/GearpumpPipelineRunner.java        |  6 ++---
 .../gearpump/examples/StreamingWordCount.java   |  6 ++---
 .../translators/ParDoBoundMultiTranslator.java  |  3 ++-
 .../translators/ParDoBoundTranslator.java       |  3 ++-
 .../translators/functions/DoFnFunction.java     |  3 ++-
 .../translators/utils/GearpumpDoFnRunner.java   | 23 ++++++++++----------
 7 files changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index bc27147..6184bc3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.gearpump;
 
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import java.io.IOException;
 
 /**
  * Result of executing a {@link Pipeline} with Gearpump.
@@ -33,10 +36,26 @@ public class GearpumpPipelineResult implements PipelineResult {
   }
 
   @Override
+  public State cancel() throws IOException {
+    return null;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public State waitUntilFinish() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
   public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
       throws AggregatorRetrievalException {
     throw new AggregatorRetrievalException(
         "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
         new UnsupportedOperationException());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
index 660d703..4182ee4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -151,7 +151,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
 
     private final Window.Bound<T> wrapped;
 
-    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+    AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
       this.wrapped = wrapped;
     }
 
@@ -184,7 +184,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
     }
   }
 
-  private static class IdentityFn<T> extends DoFn<T, T> {
+  private static class IdentityFn<T> extends OldDoFn<T, T> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index c51289d..5f35c6b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamingWordCount {
 
-  static class ExtractWordsFn extends DoFn<String, String> {
+  static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
@@ -66,7 +66,7 @@ public class StreamingWordCount {
     }
   }
 
-  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+  static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
     private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index af5bcbc..d5ed0d2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
@@ -92,7 +93,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
 
     public DoFnMultiFunction(
         GearpumpPipelineOptions pipelineOptions,
-        DoFn<InputT, OutputT> doFn,
+        OldDoFn<InputT, OutputT> doFn,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList sideOutputTags,
         WindowingStrategy<?, ?> windowingStrategy,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index 689bc08..b97cbb4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators;
 import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -38,7 +39,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    DoFn<InputT, OutputT> doFn = transform.getFn();
+    OldDoFn<InputT, OutputT> doFn = transform.getFn();
     PCollection<OutputT> output = context.getOutput(transform);
     WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 088fc14..b1ebd2a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -50,7 +51,7 @@ public class DoFnFunction<InputT, OutputT> implements
 
   public DoFnFunction(
       GearpumpPipelineOptions pipelineOptions,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       SideInputReader sideInputReader) {
     this.doFnRunner = new GearpumpDoFnRunner<>(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
index 608ad7c..be0d025 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -64,7 +65,7 @@ import java.util.Set;
 public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>,
     Serializable {
 
-  private final DoFn<InputT, OutputT> fn;
+  private final OldDoFn<InputT, OutputT> fn;
   private final transient PipelineOptions options;
   private final SideInputReader sideInputReader;
   private final DoFnRunners.OutputManager outputManager;
@@ -76,7 +77,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
 
   public GearpumpDoFnRunner(
       GearpumpPipelineOptions pipelineOptions,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       SideInputReader sideInputReader,
       DoFnRunners.OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
@@ -119,7 +120,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
   @Override
   public void processElement(WindowedValue<InputT> elem) {
     if (elem.getWindows().size() <= 1
-        || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+        || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
         && context.sideInputReader.isEmpty())) {
       invokeProcessElement(elem);
     } else {
@@ -144,7 +145,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
   }
 
   private void invokeProcessElement(WindowedValue<InputT> elem) {
-    final DoFn<InputT, OutputT>.ProcessContext processContext =
+    final OldDoFn<InputT, OutputT>.ProcessContext processContext =
         new DoFnProcessContext<>(fn, context, elem);
     // This can contain user code. Wrap it in case it throws an exception.
     try {
@@ -169,11 +170,11 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
    * @param <OutputT> the type of the DoFn's (main) output elements
    */
   private static class DoFnContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.Context {
+      extends OldDoFn<InputT, OutputT>.Context {
     private static final int MAX_SIDE_OUTPUTS = 1000;
 
     final transient PipelineOptions options;
-    final DoFn<InputT, OutputT> fn;
+    final OldDoFn<InputT, OutputT> fn;
     final SideInputReader sideInputReader;
     final DoFnRunners.OutputManager outputManager;
     final TupleTag<OutputT> mainOutputTag;
@@ -187,7 +188,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
     private final Set<TupleTag<?>> outputTags;
 
     public DoFnContext(PipelineOptions options,
-        DoFn<InputT, OutputT> fn,
+        OldDoFn<InputT, OutputT> fn,
         SideInputReader sideInputReader,
         DoFnRunners.OutputManager outputManager,
         TupleTag<OutputT> mainOutputTag,
@@ -357,14 +358,14 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
    * @param <OutputT> the type of the DoFn's (main) output elements
    */
   private static class DoFnProcessContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext {
+      extends OldDoFn<InputT, OutputT>.ProcessContext {
 
 
-    final DoFn<InputT, OutputT> fn;
+    final OldDoFn<InputT, OutputT> fn;
     final DoFnContext<InputT, OutputT> context;
     final WindowedValue<InputT> windowedValue;
 
-    public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
         DoFnContext<InputT, OutputT> context,
         WindowedValue<InputT> windowedValue) {
       fn.super();
@@ -409,7 +410,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O
 
     @Override
     public BoundedWindow window() {
-      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
         throw new UnsupportedOperationException(
             "window() is only available in the context of a DoFn marked as RequiresWindow.");
       }