You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:54:05 UTC
[40/50] [abbrv] beam git commit: Fix
ParDoTest#testPipelineOptionsParameter
Fix ParDoTest#testPipelineOptionsParameter
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/725f547f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/725f547f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/725f547f
Branch: refs/heads/master
Commit: 725f547f5e487dd3e84d5d0f95c0fa3efa853279
Parents: 2206827
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jul 8 00:13:19 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jul 8 00:13:19 2017 +0800
----------------------------------------------------------------------
.../gearpump/translators/io/GearpumpSource.java | 12 ++----------
.../translators/utils/DoFnRunnerFactory.java | 5 +++--
.../translators/utils/TranslatorUtils.java | 19 +++++++++++++++++++
3 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index daa8c81..2f53139 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -18,9 +18,6 @@
package org.apache.beam.runners.gearpump.translators.io;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import java.io.IOException;
import java.time.Instant;
@@ -48,11 +45,7 @@ public abstract class GearpumpSource<T> implements DataSource {
private boolean available = false;
GearpumpSource(PipelineOptions options) {
- try {
- this.serializedOptions = new ObjectMapper().writeValueAsBytes(options);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+ this.serializedOptions = TranslatorUtils.serializePipelineOptions(options);
}
protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException;
@@ -60,8 +53,7 @@ public abstract class GearpumpSource<T> implements DataSource {
@Override
public void open(TaskContext context, Instant startTime) {
try {
- PipelineOptions options = new ObjectMapper()
- .readValue(serializedOptions, PipelineOptions.class);
+ PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
this.reader = createReader(options);
this.available = reader.start();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 35cf2b5..375b696 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -43,7 +43,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
private static final long serialVersionUID = -4109539010014189725L;
private final DoFn<InputT, OutputT> fn;
- private final transient PipelineOptions options;
+ private final byte[] serializedOptions;
private final Collection<PCollectionView<?>> sideInputs;
private final DoFnRunners.OutputManager outputManager;
private final TupleTag<OutputT> mainOutputTag;
@@ -61,7 +61,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
StepContext stepContext,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
- this.options = pipelineOptions;
+ this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions);
this.sideInputs = sideInputs;
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
@@ -72,6 +72,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
ReadyCheckingSideInputReader sideInputReader) {
+ PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
options, fn, sideInputReader, outputManager, mainOutputTag,
sideOutputTags, stepContext, windowingStrategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b1cd61c..c14298f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,8 +18,11 @@
package org.apache.beam.runners.gearpump.translators.utils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
@@ -27,6 +30,7 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -141,6 +145,21 @@ public class TranslatorUtils {
}
}
+ public static byte[] serializePipelineOptions(PipelineOptions options) {
+ try {
+ return new ObjectMapper().writeValueAsBytes(options);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) {
+ try {
+ return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.