You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 16:32:55 UTC
[5/9] beam git commit: Stop registering IOChannelFactories in SDK
harness, runners, tests
Stop registering IOChannelFactories in SDK harness, runners, tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5519fe51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5519fe51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5519fe51
Branch: refs/heads/master
Commit: 5519fe51aea3e95fa0ea4a36c7b036917b7f94ec
Parents: dc9e004
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 18:02:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:32:45 2017 -0700
----------------------------------------------------------------------
.../apex/translation/utils/SerializablePipelineOptions.java | 2 --
.../flink/translation/utils/SerializedPipelineOptions.java | 2 --
.../beam/runners/spark/translation/SparkRuntimeContext.java | 4 +---
.../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 4 +---
.../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 --
.../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 3 ---
6 files changed, 2 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 14476b5..02afa7a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
@@ -63,7 +62,6 @@ public class SerializablePipelineOptions implements Externalizable {
.as(ApexPipelineOptions.class);
if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
- IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
FileSystems.setDefaultConfigInWorkers(pipelineOptions);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index f717fd7..84f3bf4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
/**
@@ -57,7 +56,6 @@ public class SerializedPipelineOptions implements Serializable {
try {
pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
- IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
FileSystems.setDefaultConfigInWorkers(pipelineOptions);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 6bba863..e006143 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
/**
* The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -78,8 +77,7 @@ public class SparkRuntimeContext implements Serializable {
pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
}
}
- // register IO factories.
- IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ // Register standard FileSystems.
FileSystems.setDefaultConfigInWorkers(pipelineOptions);
}
return pipelineOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index a318dfc..229e04f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
/**
@@ -44,8 +43,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
checkNotNull(options);
PipelineOptionsValidator.validate(PipelineOptions.class, options);
- // (Re-)register standard IO factories. Clobbers any prior credentials.
- IOChannelUtils.registerIOFactoriesAllowOverride(options);
+ // (Re-)register standard FileSystems. Clobbers any prior credentials.
FileSystems.setDefaultConfigInWorkers(options);
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 4d0cc2b..868dcbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
@@ -400,7 +399,6 @@ public class TestPipeline extends Pipeline implements TestRule {
}
options.setStableUniqueNames(CheckEnabled.ERROR);
- IOChannelUtils.registerIOFactoriesAllowOverride(options);
FileSystems.setDefaultConfigInWorkers(options);
return options;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 24f826c..05ab44f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -33,7 +33,6 @@ import org.apache.beam.fn.harness.stream.StreamObserverFactory;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,8 +89,6 @@ public class FnHarness {
public static void main(PipelineOptions options,
BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor,
BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
- IOChannelUtils.registerIOFactories(options);
-
ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
PrintStream originalErrStream = System.err;