You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/11/08 06:30:31 UTC

[1/2] beam git commit: mr-runner: use SerializablePipelineOptions to serde PipelineOptions

Repository: beam
Updated Branches:
  refs/heads/mr-runner b6f22aa76 -> 2cef54ea2


mr-runner: use SerializablePipelineOptions to serde PipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71b5e7c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71b5e7c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71b5e7c4

Branch: refs/heads/mr-runner
Commit: 71b5e7c45d1501030717cbfd608bfae36641de79
Parents: b6f22aa
Author: huafengw <fv...@gmail.com>
Authored: Wed Sep 13 10:24:24 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Nov 8 14:28:41 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |  2 +-
 .../mapreduce/translation/BeamInputFormat.java  | 22 +++---
 .../mapreduce/translation/JobPrototype.java     |  4 +-
 .../mapreduce/translation/ParDoOperation.java   |  7 +-
 .../translation/SerializedPipelineOptions.java  | 76 --------------------
 5 files changed, 20 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 000f20c..7f2e851 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -93,7 +93,7 @@
   </profiles>
 
   <dependencies>
-    <!-- MapRecue dependencies -->
+    <!-- MapReduce dependencies -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 3d0b8ea..8a55c5e 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -31,6 +31,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -57,7 +59,7 @@ public class BeamInputFormat<T> extends InputFormat {
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000;
 
   private List<ReadOperation.TaggedSource> sources;
-  private SerializedPipelineOptions options;
+  private SerializablePipelineOptions options;
 
   public BeamInputFormat() {
   }
@@ -73,8 +75,8 @@ public class BeamInputFormat<T> extends InputFormat {
     }
     sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray(
         Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
-    options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions"));
+    options = ((SerializablePipelineOptions) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedPipelineOptions), "SerializablePipelineOptions"));
 
     try {
 
@@ -86,7 +88,7 @@ public class BeamInputFormat<T> extends InputFormat {
                     final ReadOperation.TaggedSource taggedSource) {
                   try {
                     return FluentIterable.from(taggedSource.getSource().split(
-                        DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()))
+                        DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.get()))
                         .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() {
                           @Override
                           public ReadOperation.TaggedSource apply(BoundedSource<?> input) {
@@ -120,7 +122,7 @@ public class BeamInputFormat<T> extends InputFormat {
   private static class BeamInputSplit<T> extends InputSplit implements Writable {
     private String stepName;
     private BoundedSource<T> boundedSource;
-    private SerializedPipelineOptions options;
+    private SerializablePipelineOptions options;
     private TupleTag<?> tupleTag;
 
     public BeamInputSplit() {
@@ -129,7 +131,7 @@ public class BeamInputFormat<T> extends InputFormat {
     public BeamInputSplit(
         String stepName,
         BoundedSource<T> boundedSource,
-        SerializedPipelineOptions options,
+        SerializablePipelineOptions options,
         TupleTag<?> tupleTag) {
       this.stepName = checkNotNull(stepName, "stepName");
       this.boundedSource = checkNotNull(boundedSource, "boundedSources");
@@ -139,13 +141,13 @@ public class BeamInputFormat<T> extends InputFormat {
 
     public BeamRecordReader<T> createReader() throws IOException {
       return new BeamRecordReader<>(
-          stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+          stepName, boundedSource.createReader(options.get()), tupleTag);
     }
 
     @Override
     public long getLength() throws IOException, InterruptedException {
       try {
-        return boundedSource.getEstimatedSizeBytes(options.getPipelineOptions());
+        return boundedSource.getEstimatedSizeBytes(options.get());
       } catch (Exception e) {
         Throwables.throwIfUnchecked(e);
         Throwables.throwIfInstanceOf(e, IOException.class);
@@ -164,7 +166,7 @@ public class BeamInputFormat<T> extends InputFormat {
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       StringUtf8Coder.of().encode(stepName, stream);
       SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
-      SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream);
+      SerializableCoder.of(SerializablePipelineOptions.class).encode(options, stream);
       SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
 
       byte[] bytes = stream.toByteArray();
@@ -181,7 +183,7 @@ public class BeamInputFormat<T> extends InputFormat {
       ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
       stepName = StringUtf8Coder.of().decode(inStream);
       boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream);
-      options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
+      options = SerializableCoder.of(SerializablePipelineOptions.class).decode(inStream);
       tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index e8e6eab..3e0061a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -30,6 +30,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -106,7 +108,7 @@ public class JobPrototype {
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
         Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
-            new SerializedPipelineOptions(options))));
+            new SerializablePipelineOptions(options))));
     job.setInputFormatClass(BeamInputFormat.class);
 
     if (fusedStep.containsGroupByKey()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index ef83e72..fd4daca 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  */
 public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> {
   private final String stepName;
-  protected final SerializedPipelineOptions options;
+  protected final SerializablePipelineOptions options;
   protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
   protected final WindowingStrategy<?, ?> windowingStrategy;
@@ -70,7 +71,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
       WindowingStrategy<?, ?> windowingStrategy) {
     super(1 + sideOutputTags.size());
     this.stepName = checkNotNull(stepName, "stepName");
-    this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
+    this.options = new SerializablePipelineOptions(checkNotNull(options, "options"));
     this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
     this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
     this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
@@ -109,7 +110,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
     final TimerInternals timerInternals = new InMemoryTimerInternals();
 
     fnRunner = DoFnRunners.simpleRunner(
-        options.getPipelineOptions(),
+        options.get(),
         getDoFn(),
         sideInputTags.isEmpty()
             ? NullSideInputReader.empty() :

http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
deleted file mode 100644
index 5c37b7c..0000000
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-  private final byte[] serializedOptions;
-
-  /** Lazily initialized copy of deserialized options. */
-  private transient PipelineOptions pipelineOptions;
-
-  public SerializedPipelineOptions(PipelineOptions options) {
-    checkNotNull(options, "PipelineOptions must not be null.");
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      createMapper().writeValue(baos, options);
-      this.serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    if (pipelineOptions == null) {
-      try {
-        pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
-
-        FileSystems.setDefaultPipelineOptions(pipelineOptions);
-      } catch (IOException e) {
-        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-      }
-    }
-
-    return pipelineOptions;
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-}


[2/2] beam git commit: This closes #3845

Posted by pe...@apache.org.
This closes #3845


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cef54ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cef54ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cef54ea

Branch: refs/heads/mr-runner
Commit: 2cef54ea2562c679d68ff6faed5598fc74b9811a
Parents: b6f22aa 71b5e7c
Author: Pei He <pe...@apache.org>
Authored: Wed Nov 8 14:29:21 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Nov 8 14:29:21 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |  2 +-
 .../mapreduce/translation/BeamInputFormat.java  | 22 +++---
 .../mapreduce/translation/JobPrototype.java     |  4 +-
 .../mapreduce/translation/ParDoOperation.java   |  7 +-
 .../translation/SerializedPipelineOptions.java  | 76 --------------------
 5 files changed, 20 insertions(+), 91 deletions(-)
----------------------------------------------------------------------