You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ma...@apache.org on 2017/08/23 14:02:11 UTC

[1/2] beam git commit: [BEAM-2788] Use SerializablePipelineOptions to serde PipelineOptions in Gearpump runner

Repository: beam
Updated Branches:
  refs/heads/master 9b175ccc2 -> f0ce31b9d


[BEAM-2788] Use SerializablePipelineOptions to serde PipelineOptions in Gearpump runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/081df6d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/081df6d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/081df6d3

Branch: refs/heads/master
Commit: 081df6d331f60926c4bcf3d49a3bef8ab3ad4f2c
Parents: 9b175cc
Author: huafengw <fv...@gmail.com>
Authored: Tue Aug 22 15:15:29 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 23 21:58:28 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        | 13 +++++--------
 .../translators/GroupByKeyTranslator.java       |  2 +-
 .../gearpump/translators/io/GearpumpSource.java |  7 ++++---
 .../translators/utils/DoFnRunnerFactory.java    |  7 ++++---
 .../translators/utils/TranslatorUtils.java      | 20 --------------------
 .../FlattenPCollectionsTranslatorTest.java      |  6 ++++++
 6 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 30bc354..2b460e7 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -154,14 +154,6 @@
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
@@ -183,6 +175,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <classifier>tests</classifier>

http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8409beb..bea5a74 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -94,7 +94,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
 
     @Override
-    public <T> Window[]  apply(Context<T> context) {
+    public <T> Window[] apply(Context<T> context) {
       try {
         Object element = context.element();
         if (element instanceof TranslatorUtils.RawUnionValue) {

http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 2f53139..3766195 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators.io;
 import java.io.IOException;
 import java.time.Instant;
 
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -39,13 +40,13 @@ import org.apache.gearpump.streaming.task.TaskContext;
  */
 public abstract class GearpumpSource<T> implements DataSource {
 
-  private final byte[] serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   private Source.Reader<T> reader;
   private boolean available = false;
 
   GearpumpSource(PipelineOptions options) {
-    this.serializedOptions = TranslatorUtils.serializePipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
   }
 
   protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException;
@@ -53,7 +54,7 @@ public abstract class GearpumpSource<T> implements DataSource {
   @Override
   public void open(TaskContext context, Instant startTime) {
     try {
-      PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
+      PipelineOptions options = serializedOptions.get();
       this.reader = createReader(options);
       this.available = reader.start();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 375b696..6557c8b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -43,7 +44,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
   private static final long serialVersionUID = -4109539010014189725L;
   private final DoFn<InputT, OutputT> fn;
-  private final byte[] serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
   private final Collection<PCollectionView<?>> sideInputs;
   private final DoFnRunners.OutputManager outputManager;
   private final TupleTag<OutputT> mainOutputTag;
@@ -61,7 +62,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       StepContext stepContext,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
-    this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
     this.sideInputs = sideInputs;
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
@@ -72,7 +73,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
   public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
       ReadyCheckingSideInputReader sideInputReader) {
-    PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
+    PipelineOptions options = serializedOptions.get();
     DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, windowingStrategy);

http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index c14298f..2dae955 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,11 +18,8 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -145,22 +141,6 @@ public class TranslatorUtils {
     }
   }
 
-  public static byte[] serializePipelineOptions(PipelineOptions options) {
-    try {
-      return new ObjectMapper().writeValueAsBytes(options);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) {
-    try {
-      return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   /**
    * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index 1262177..1115fad 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -30,7 +30,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
@@ -62,6 +64,8 @@ public class FlattenPCollectionsTranslatorTest {
 
     when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP);
     when(translationContext.getOutput()).thenReturn(mockOutput);
+    when(translationContext.getPipelineOptions())
+        .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class));
 
     translator.translate(transform, translationContext);
     verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
@@ -141,6 +145,8 @@ public class FlattenPCollectionsTranslatorTest {
 
     when(translationContext.getInputs()).thenReturn(inputs);
     when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+    when(translationContext.getPipelineOptions())
+        .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class));
 
     translator.translate(transform, translationContext);
     verify(javaStream1).map(any(MapFunction.class), eq("dummy"));


[2/2] beam git commit: This closes 3743

Posted by ma...@apache.org.
This closes 3743


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0ce31b9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0ce31b9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0ce31b9

Branch: refs/heads/master
Commit: f0ce31b9d3dc53fbacaa36a75f9c7f4ce93d5c0e
Parents: 9b175cc 081df6d
Author: manuzhang <ow...@gmail.com>
Authored: Wed Aug 23 22:01:49 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 23 22:01:49 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        | 13 +++++--------
 .../translators/GroupByKeyTranslator.java       |  2 +-
 .../gearpump/translators/io/GearpumpSource.java |  7 ++++---
 .../translators/utils/DoFnRunnerFactory.java    |  7 ++++---
 .../translators/utils/TranslatorUtils.java      | 20 --------------------
 .../FlattenPCollectionsTranslatorTest.java      |  6 ++++++
 6 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------