You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:17 UTC

[05/12] beam git commit: update to latest gearpump dsl function interface

update to latest gearpump dsl function interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf82638
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf82638
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf82638

Branch: refs/heads/gearpump-runner
Commit: 3bf82638096ae7aa91c7d3c862c2994772bee51b
Parents: e63d42d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 13:36:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:40:18 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 12 ++++----
 .../translators/ParDoBoundMultiTranslator.java  | 29 ++++++++++++++------
 .../translators/WindowBoundTranslator.java      |  4 +--
 .../translators/functions/DoFnFunction.java     | 21 +++++++++++---
 4 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8e3ffe3..4eaf755 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -36,15 +36,15 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
 import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
 import org.apache.gearpump.streaming.dsl.window.api.Window;
 import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
 import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
 import scala.collection.JavaConversions;
 
 
@@ -122,7 +122,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class GroupByFn<K, V> implements
+  private static class GroupByFn<K, V> extends
       GroupByFunction<WindowedValue<KV<K, V>>, K> {
 
     @Override
@@ -132,7 +132,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   }
 
   private static class ValueToIterable<K, V>
-      implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
+      extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
@@ -141,7 +141,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class MergeValue<K, V> implements
+  private static class MergeValue<K, V> extends
       ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 24f9734..0d5b8bc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -41,10 +42,10 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
+import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
@@ -83,12 +84,13 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
   /**
    * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
    */
-  private static class DoFnMultiFunction<InputT, OutputT> implements
-      FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
-      DoFnRunners.OutputManager {
+  private static class DoFnMultiFunction<InputT, OutputT>
+    extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
+    implements DoFnRunners.OutputManager {
 
     private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
     private DoFnRunner<InputT, OutputT> doFnRunner;
+    private final DoFn<InputT, OutputT> doFn;
     private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
         .newArrayList();
 
@@ -99,6 +101,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
         TupleTagList sideOutputTags,
         WindowingStrategy<?, ?> windowingStrategy,
         SideInputReader sideInputReader) {
+      this.doFn = doFn;
       this.doFnRunnerFactory = new DoFnRunnerFactory<>(
           pipelineOptions,
           doFn,
@@ -113,6 +116,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
 
     @Override
+    public void setup() {
+      DoFnInvokers.invokerFor(doFn).invokeSetup();
+    }
+
+    @Override
+    public void teardown() {
+      DoFnInvokers.invokerFor(doFn).invokeTeardown();
+    }
+
+    @Override
     public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
       if (null == doFnRunner) {
         doFnRunner = doFnRunnerFactory.createRunner();
@@ -133,7 +146,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
   }
 
-  private static class FilterByOutputTag<OutputT> implements
+  private static class FilterByOutputTag<OutputT> extends
       FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
 
     private final TupleTag<OutputT> tupleTag;
@@ -148,7 +161,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
   }
 
-  private static class ExtractOutput<OutputT> implements
+  private static class ExtractOutput<OutputT> extends
       MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 32dd5de..d3c50a5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -34,8 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.task.TaskContext;
 import org.joda.time.Instant;
 
@@ -61,7 +61,7 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-  private static class AssignWindows<T> implements
+  private static class AssignWindows<T> extends
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
     private final WindowFn<T, BoundedWindow> fn;

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 42969fe..a66d3a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -30,30 +30,33 @@ import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
  */
-public class DoFnFunction<InputT, OutputT> implements
-    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+public class DoFnFunction<InputT, OutputT> extends
+    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
+    DoFnRunners.OutputManager {
 
   private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
   private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
   private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
   private DoFnRunner<InputT, OutputT> doFnRunner;
+  private final DoFn<InputT, OutputT> doFn;
 
   public DoFnFunction(
       GearpumpPipelineOptions pipelineOptions,
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       SideInputReader sideInputReader) {
+    this.doFn = doFn;
     this.doFnRunnerFactory = new DoFnRunnerFactory<>(
         pipelineOptions,
         doFn,
@@ -68,6 +71,16 @@ public class DoFnFunction<InputT, OutputT> implements
   }
 
   @Override
+  public void setup() {
+    DoFnInvokers.invokerFor(doFn).invokeSetup();
+  }
+
+  @Override
+  public void teardown() {
+    DoFnInvokers.invokerFor(doFn).invokeTeardown();
+  }
+
+  @Override
   public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
     outputs = Lists.newArrayList();