You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ma...@apache.org on 2017/08/23 14:02:11 UTC
[1/2] beam git commit: [BEAM-2788] Use SerializablePipelineOptions to
serde PipelineOptions in Gearpump runner
Repository: beam
Updated Branches:
refs/heads/master 9b175ccc2 -> f0ce31b9d
[BEAM-2788] Use SerializablePipelineOptions to serde PipelineOptions in Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/081df6d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/081df6d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/081df6d3
Branch: refs/heads/master
Commit: 081df6d331f60926c4bcf3d49a3bef8ab3ad4f2c
Parents: 9b175cc
Author: huafengw <fv...@gmail.com>
Authored: Tue Aug 22 15:15:29 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 23 21:58:28 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 13 +++++--------
.../translators/GroupByKeyTranslator.java | 2 +-
.../gearpump/translators/io/GearpumpSource.java | 7 ++++---
.../translators/utils/DoFnRunnerFactory.java | 7 ++++---
.../translators/utils/TranslatorUtils.java | 20 --------------------
.../FlattenPCollectionsTranslatorTest.java | 6 ++++++
6 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 30bc354..2b460e7 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -154,14 +154,6 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
@@ -183,6 +175,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<classifier>tests</classifier>
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8409beb..bea5a74 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -94,7 +94,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
@Override
- public <T> Window[] apply(Context<T> context) {
+ public <T> Window[] apply(Context<T> context) {
try {
Object element = context.element();
if (element instanceof TranslatorUtils.RawUnionValue) {
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 2f53139..3766195 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators.io;
import java.io.IOException;
import java.time.Instant;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -39,13 +40,13 @@ import org.apache.gearpump.streaming.task.TaskContext;
*/
public abstract class GearpumpSource<T> implements DataSource {
- private final byte[] serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
private Source.Reader<T> reader;
private boolean available = false;
GearpumpSource(PipelineOptions options) {
- this.serializedOptions = TranslatorUtils.serializePipelineOptions(options);
+ this.serializedOptions = new SerializablePipelineOptions(options);
}
protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException;
@@ -53,7 +54,7 @@ public abstract class GearpumpSource<T> implements DataSource {
@Override
public void open(TaskContext context, Instant startTime) {
try {
- PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
+ PipelineOptions options = serializedOptions.get();
this.reader = createReader(options);
this.available = reader.start();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 375b696..6557c8b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -43,7 +44,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
private static final long serialVersionUID = -4109539010014189725L;
private final DoFn<InputT, OutputT> fn;
- private final byte[] serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
private final Collection<PCollectionView<?>> sideInputs;
private final DoFnRunners.OutputManager outputManager;
private final TupleTag<OutputT> mainOutputTag;
@@ -61,7 +62,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
StepContext stepContext,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
- this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.sideInputs = sideInputs;
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
@@ -72,7 +73,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
ReadyCheckingSideInputReader sideInputReader) {
- PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions);
+ PipelineOptions options = serializedOptions.get();
DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
options, fn, sideInputReader, outputManager, mainOutputTag,
sideOutputTags, stepContext, windowingStrategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index c14298f..2dae955 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,11 +18,8 @@
package org.apache.beam.runners.gearpump.translators.utils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
@@ -30,7 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -145,22 +141,6 @@ public class TranslatorUtils {
}
}
- public static byte[] serializePipelineOptions(PipelineOptions options) {
- try {
- return new ObjectMapper().writeValueAsBytes(options);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) {
- try {
- return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/081df6d3/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index 1262177..1115fad 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -30,7 +30,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
@@ -62,6 +64,8 @@ public class FlattenPCollectionsTranslatorTest {
when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP);
when(translationContext.getOutput()).thenReturn(mockOutput);
+ when(translationContext.getPipelineOptions())
+ .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class));
translator.translate(transform, translationContext);
verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
@@ -141,6 +145,8 @@ public class FlattenPCollectionsTranslatorTest {
when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+ when(translationContext.getPipelineOptions())
+ .thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class));
translator.translate(transform, translationContext);
verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
[2/2] beam git commit: This closes 3743
Posted by ma...@apache.org.
This closes 3743
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0ce31b9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0ce31b9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0ce31b9
Branch: refs/heads/master
Commit: f0ce31b9d3dc53fbacaa36a75f9c7f4ce93d5c0e
Parents: 9b175cc 081df6d
Author: manuzhang <ow...@gmail.com>
Authored: Wed Aug 23 22:01:49 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 23 22:01:49 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 13 +++++--------
.../translators/GroupByKeyTranslator.java | 2 +-
.../gearpump/translators/io/GearpumpSource.java | 7 ++++---
.../translators/utils/DoFnRunnerFactory.java | 7 ++++---
.../translators/utils/TranslatorUtils.java | 20 --------------------
.../FlattenPCollectionsTranslatorTest.java | 6 ++++++
6 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------