You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/18 09:12:44 UTC
[1/2] beam git commit: [BEAM-59] Register standard FileSystems
wherever we register IOChannelFactories
Repository: beam
Updated Branches:
refs/heads/master c52ce7c4b -> e5568589c
[BEAM-59] Register standard FileSystems wherever we register IOChannelFactories
Additionally, drop an unnecessary use of `GcsOptions` in
`PipelineRunner`.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b43c92f2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b43c92f2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b43c92f2
Branch: refs/heads/master
Commit: b43c92f208304cfc10d79b140682dfbe6580d7c4
Parents: c52ce7c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 17 20:39:48 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Apr 18 10:33:10 2017 +0200
----------------------------------------------------------------------
.../flink/translation/utils/SerializedPipelineOptions.java | 2 ++
.../beam/runners/spark/translation/SparkRuntimeContext.java | 2 ++
.../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++----
.../main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 ++
4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 390e6da..2256bb1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.IOChannelUtils;
@@ -55,6 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ FileSystems.setDefaultConfigInWorkers(pipelineOptions);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 4ccfead..9d0f576 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
@@ -130,6 +131,7 @@ public class SparkRuntimeContext implements Serializable {
}
// register IO factories.
IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ FileSystems.setDefaultConfigInWorkers(pipelineOptions);
}
return pipelineOptions;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 80bb90f..7b2fba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -41,11 +40,11 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
* @return The newly created runner.
*/
public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
- GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options);
checkNotNull(options);
// (Re-)register standard IO factories. Clobbers any prior credentials.
- IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
+ IOChannelUtils.registerIOFactoriesAllowOverride(options);
+ FileSystems.setDefaultConfigInWorkers(options);
@SuppressWarnings("unchecked")
PipelineRunner<? extends PipelineResult> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a4ab196..3d3de51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -404,6 +405,7 @@ public class TestPipeline extends Pipeline implements TestRule {
options.setStableUniqueNames(CheckEnabled.ERROR);
IOChannelUtils.registerIOFactoriesAllowOverride(options);
+ FileSystems.setDefaultConfigInWorkers(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
[2/2] beam git commit: [BEAM-59] This closes #2569
Posted by jb...@apache.org.
[BEAM-59] This closes #2569
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5568589
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5568589
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5568589
Branch: refs/heads/master
Commit: e5568589cab11399126f678ad3fbca4b1fb715e4
Parents: c52ce7c b43c92f
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Apr 18 11:12:39 2017 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Apr 18 11:12:39 2017 +0200
----------------------------------------------------------------------
.../flink/translation/utils/SerializedPipelineOptions.java | 2 ++
.../beam/runners/spark/translation/SparkRuntimeContext.java | 2 ++
.../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++----
.../main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 ++
4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------