You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 16:32:55 UTC

[5/9] beam git commit: Stop registering IOChannelFactories in SDK harness, runners, tests

Stop registering IOChannelFactories in SDK harness, runners, tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5519fe51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5519fe51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5519fe51

Branch: refs/heads/master
Commit: 5519fe51aea3e95fa0ea4a36c7b036917b7f94ec
Parents: dc9e004
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 18:02:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:32:45 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/SerializablePipelineOptions.java      | 2 --
 .../flink/translation/utils/SerializedPipelineOptions.java       | 2 --
 .../beam/runners/spark/translation/SparkRuntimeContext.java      | 4 +---
 .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java    | 4 +---
 .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java  | 2 --
 .../src/main/java/org/apache/beam/fn/harness/FnHarness.java      | 3 ---
 6 files changed, 2 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 14476b5..02afa7a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
@@ -63,7 +62,6 @@ public class SerializablePipelineOptions implements Externalizable {
         .as(ApexPipelineOptions.class);
 
     if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
-      IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
       FileSystems.setDefaultConfigInWorkers(pipelineOptions);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index f717fd7..84f3bf4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
@@ -57,7 +56,6 @@ public class SerializedPipelineOptions implements Serializable {
       try {
         pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
 
-        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
         FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);

http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 6bba863..e006143 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -78,8 +77,7 @@ public class SparkRuntimeContext implements Serializable {
             pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
           }
         }
-        // register IO factories.
-        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        // Register standard FileSystems.
         FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       }
       return pipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index a318dfc..229e04f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 
 /**
@@ -44,8 +43,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
     checkNotNull(options);
     PipelineOptionsValidator.validate(PipelineOptions.class, options);
 
-    // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerIOFactoriesAllowOverride(options);
+    // (Re-)register standard FileSystems. Clobbers any prior credentials.
     FileSystems.setDefaultConfigInWorkers(options);
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 4d0cc2b..868dcbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
@@ -400,7 +399,6 @@ public class TestPipeline extends Pipeline implements TestRule {
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
-      IOChannelUtils.registerIOFactoriesAllowOverride(options);
       FileSystems.setDefaultConfigInWorkers(options);
       return options;
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 24f826c..05ab44f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -33,7 +33,6 @@ import org.apache.beam.fn.harness.stream.StreamObserverFactory;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,8 +89,6 @@ public class FnHarness {
   public static void main(PipelineOptions options,
       BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor,
       BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
-    IOChannelUtils.registerIOFactories(options);
-
     ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
     StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
     PrintStream originalErrStream = System.err;