You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/18 09:12:44 UTC

[1/2] beam git commit: [BEAM-59] Register standard FileSystems wherever we register IOChannelFactories

Repository: beam
Updated Branches:
  refs/heads/master c52ce7c4b -> e5568589c


[BEAM-59] Register standard FileSystems wherever we register IOChannelFactories

Additionally, drop an unnecessary use of `GcsOptions` in
`PipelineRunner`.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b43c92f2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b43c92f2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b43c92f2

Branch: refs/heads/master
Commit: b43c92f208304cfc10d79b140682dfbe6580d7c4
Parents: c52ce7c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 17 20:39:48 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Apr 18 10:33:10 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/utils/SerializedPipelineOptions.java    | 2 ++
 .../beam/runners/spark/translation/SparkRuntimeContext.java   | 2 ++
 .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++----
 .../main/java/org/apache/beam/sdk/testing/TestPipeline.java   | 2 ++
 4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 390e6da..2256bb1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.IOChannelUtils;
 
@@ -55,6 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
         pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
 
         IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 4ccfead..9d0f576 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
@@ -130,6 +131,7 @@ public class SparkRuntimeContext implements Serializable {
         }
         // register IO factories.
         IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       }
       return pipelineOptions;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 80bb90f..7b2fba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 
@@ -41,11 +40,11 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
    * @return The newly created runner.
    */
   public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
-    GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options);
     checkNotNull(options);
 
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
+    FileSystems.setDefaultConfigInWorkers(options);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a4ab196..3d3de51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -404,6 +405,7 @@ public class TestPipeline extends Pipeline implements TestRule {
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
       IOChannelUtils.registerIOFactoriesAllowOverride(options);
+      FileSystems.setDefaultConfigInWorkers(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException(


[2/2] beam git commit: [BEAM-59] This closes #2569

Posted by jb...@apache.org.
[BEAM-59] This closes #2569


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5568589
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5568589
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5568589

Branch: refs/heads/master
Commit: e5568589cab11399126f678ad3fbca4b1fb715e4
Parents: c52ce7c b43c92f
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Apr 18 11:12:39 2017 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Apr 18 11:12:39 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/utils/SerializedPipelineOptions.java    | 2 ++
 .../beam/runners/spark/translation/SparkRuntimeContext.java   | 2 ++
 .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++----
 .../main/java/org/apache/beam/sdk/testing/TestPipeline.java   | 2 ++
 4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------