You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:57 UTC

[49/50] incubator-beam git commit: [BEAM-79] Upgrade to beam-0.5.0-incubating-SNAPSHOT

[BEAM-79] Upgrade to beam-0.5.0-incubating-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/647034cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/647034cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/647034cf

Branch: refs/heads/gearpump-runner
Commit: 647034cfc6ee419548b6da222e6d134792366a26
Parents: c2fb7c0
Author: manuzhang <ow...@gmail.com>
Authored: Wed Dec 21 09:32:35 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Dec 21 09:32:35 2016 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                              |  2 +-
 .../runners/gearpump/examples/StreamingWordCount.java | 14 +++++++-------
 .../translators/ParDoBoundMultiTranslator.java        |  2 +-
 .../gearpump/translators/ParDoBoundTranslator.java    |  2 +-
 .../gearpump/translators/TranslationContext.java      |  3 +--
 .../gearpump/translators/utils/DoFnRunnerFactory.java |  2 +-
 .../translators/utils/NoOpAggregatorFactory.java      |  2 +-
 7 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 9320561..bb35ad7 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.4.0-incubating-SNAPSHOT</version>
+    <version>0.5.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index 1d85c25..b2d762a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -42,10 +42,10 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamingWordCount {
 
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void process(ProcessContext c) {
       // Split the line into words.
       String[] words = c.element().split("[^a-zA-Z']+");
 
@@ -58,11 +58,11 @@ public class StreamingWordCount {
     }
   }
 
-  static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
     private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void process(ProcessContext c) {
       String row = c.element().getKey()
           + " - " + c.element().getValue()
           + " @ " + c.timestamp().toString();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 54f1c3f..24f9734 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
         new DoFnMultiFunction<>(
             context.getPipelineOptions(),
-            transform.getNewFn(),
+            transform.getFn(),
             transform.getMainOutputTag(),
             transform.getSideOutputTags(),
             inputT.getWindowingStrategy(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index a796c83..689bc08 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -38,7 +38,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    DoFn<InputT, OutputT> doFn = transform.getFn();
     PCollection<OutputT> output = context.getOutput(transform);
     WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index d9d6a8e..63fb619 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -54,8 +54,7 @@ public class TranslationContext {
   }
 
   public void setCurrentTransform(TransformHierarchy.Node treeNode) {
-    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
-        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+    this.currentTransform = treeNode.toAppliedPTransform();
   }
 
   public GearpumpPipelineOptions getPipelineOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 7119a87..7e1402f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -21,12 +21,12 @@ package org.apache.beam.runners.gearpump.translators.utils;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.SideInputReader;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
index bfc73bf..22ffc4d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.gearpump.translators.utils;
 
 import java.io.Serializable;
 
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.util.ExecutionContext;