You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/11/08 06:30:31 UTC
[1/2] beam git commit: mr-runner: use SerializablePipelineOptions to
serde PipelineOptions
Repository: beam
Updated Branches:
refs/heads/mr-runner b6f22aa76 -> 2cef54ea2
mr-runner: use SerializablePipelineOptions to serde PipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71b5e7c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71b5e7c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71b5e7c4
Branch: refs/heads/mr-runner
Commit: 71b5e7c45d1501030717cbfd608bfae36641de79
Parents: b6f22aa
Author: huafengw <fv...@gmail.com>
Authored: Wed Sep 13 10:24:24 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Nov 8 14:28:41 2017 +0800
----------------------------------------------------------------------
runners/map-reduce/pom.xml | 2 +-
.../mapreduce/translation/BeamInputFormat.java | 22 +++---
.../mapreduce/translation/JobPrototype.java | 4 +-
.../mapreduce/translation/ParDoOperation.java | 7 +-
.../translation/SerializedPipelineOptions.java | 76 --------------------
5 files changed, 20 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 000f20c..7f2e851 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -93,7 +93,7 @@
</profiles>
<dependencies>
- <!-- MapRecue dependencies -->
+ <!-- MapReduce dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 3d0b8ea..8a55c5e 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -31,6 +31,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
+
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -57,7 +59,7 @@ public class BeamInputFormat<T> extends InputFormat {
private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000;
private List<ReadOperation.TaggedSource> sources;
- private SerializedPipelineOptions options;
+ private SerializablePipelineOptions options;
public BeamInputFormat() {
}
@@ -73,8 +75,8 @@ public class BeamInputFormat<T> extends InputFormat {
}
sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray(
Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
- options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray(
- Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions"));
+ options = ((SerializablePipelineOptions) SerializableUtils.deserializeFromByteArray(
+ Base64.decodeBase64(serializedPipelineOptions), "SerializablePipelineOptions"));
try {
@@ -86,7 +88,7 @@ public class BeamInputFormat<T> extends InputFormat {
final ReadOperation.TaggedSource taggedSource) {
try {
return FluentIterable.from(taggedSource.getSource().split(
- DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()))
+ DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.get()))
.transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() {
@Override
public ReadOperation.TaggedSource apply(BoundedSource<?> input) {
@@ -120,7 +122,7 @@ public class BeamInputFormat<T> extends InputFormat {
private static class BeamInputSplit<T> extends InputSplit implements Writable {
private String stepName;
private BoundedSource<T> boundedSource;
- private SerializedPipelineOptions options;
+ private SerializablePipelineOptions options;
private TupleTag<?> tupleTag;
public BeamInputSplit() {
@@ -129,7 +131,7 @@ public class BeamInputFormat<T> extends InputFormat {
public BeamInputSplit(
String stepName,
BoundedSource<T> boundedSource,
- SerializedPipelineOptions options,
+ SerializablePipelineOptions options,
TupleTag<?> tupleTag) {
this.stepName = checkNotNull(stepName, "stepName");
this.boundedSource = checkNotNull(boundedSource, "boundedSources");
@@ -139,13 +141,13 @@ public class BeamInputFormat<T> extends InputFormat {
public BeamRecordReader<T> createReader() throws IOException {
return new BeamRecordReader<>(
- stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+ stepName, boundedSource.createReader(options.get()), tupleTag);
}
@Override
public long getLength() throws IOException, InterruptedException {
try {
- return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions());
+ return boundedSource.getEstimatedSizeBytes(options.get());
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, IOException.class);
@@ -164,7 +166,7 @@ public class BeamInputFormat<T> extends InputFormat {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(stepName, stream);
SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
- SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream);
+ SerializableCoder.of(SerializablePipelineOptions.class).encode(options, stream);
SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
byte[] bytes = stream.toByteArray();
@@ -181,7 +183,7 @@ public class BeamInputFormat<T> extends InputFormat {
ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
stepName = StringUtf8Coder.of().decode(inStream);
boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream);
- options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
+ options = SerializableCoder.of(SerializablePipelineOptions.class).decode(inStream);
tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index e8e6eab..3e0061a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -30,6 +30,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -106,7 +108,7 @@ public class JobPrototype {
conf.set(
BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
- new SerializedPipelineOptions(options))));
+ new SerializablePipelineOptions(options))));
job.setInputFormatClass(BeamInputFormat.class);
if (fusedStep.containsGroupByKey()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index ef83e72..fd4daca 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
*/
public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> {
private final String stepName;
- protected final SerializedPipelineOptions options;
+ protected final SerializablePipelineOptions options;
protected final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> sideOutputTags;
protected final WindowingStrategy<?, ?> windowingStrategy;
@@ -70,7 +71,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
WindowingStrategy<?, ?> windowingStrategy) {
super(1 + sideOutputTags.size());
this.stepName = checkNotNull(stepName, "stepName");
- this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
+ this.options = new SerializablePipelineOptions(checkNotNull(options, "options"));
this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
@@ -109,7 +110,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
final TimerInternals timerInternals = new InMemoryTimerInternals();
fnRunner = DoFnRunners.simpleRunner(
- options.getPipelineOptions(),
+ options.get(),
getDoFn(),
sideInputTags.isEmpty()
? NullSideInputReader.empty() :
http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
deleted file mode 100644
index 5c37b7c..0000000
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
- private final byte[] serializedOptions;
-
- /** Lazily initialized copy of deserialized options. */
- private transient PipelineOptions pipelineOptions;
-
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- createMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
-
- }
-
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
-
- FileSystems.setDefaultPipelineOptions(pipelineOptions);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
-
- return pipelineOptions;
- }
-
- /**
- * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
- * for user specified configuration injection into the ObjectMapper. This supports user custom
- * types on {@link PipelineOptions}.
- */
- private static ObjectMapper createMapper() {
- return new ObjectMapper().registerModules(
- ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
- }
-}
[2/2] beam git commit: This closes #3845
Posted by pe...@apache.org.
This closes #3845
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cef54ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cef54ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cef54ea
Branch: refs/heads/mr-runner
Commit: 2cef54ea2562c679d68ff6faed5598fc74b9811a
Parents: b6f22aa 71b5e7c
Author: Pei He <pe...@apache.org>
Authored: Wed Nov 8 14:29:21 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Nov 8 14:29:21 2017 +0800
----------------------------------------------------------------------
runners/map-reduce/pom.xml | 2 +-
.../mapreduce/translation/BeamInputFormat.java | 22 +++---
.../mapreduce/translation/JobPrototype.java | 4 +-
.../mapreduce/translation/ParDoOperation.java | 7 +-
.../translation/SerializedPipelineOptions.java | 76 --------------------
5 files changed, 20 insertions(+), 91 deletions(-)
----------------------------------------------------------------------