You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:34:46 UTC

[03/50] [abbrv] beam git commit: Rename FileSystems.setDefaultConfigInWorkers

Rename FileSystems.setDefaultConfigInWorkers

And document that it's not for users.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3540d47
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3540d47
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3540d47

Branch: refs/heads/jstorm-runner
Commit: f3540d47f10c18859340a738a7e93643ee57f604
Parents: 15df211
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 11:46:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 14:59:10 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/SerializablePipelineOptions.java  |  2 +-
 .../translation/utils/SerializedPipelineOptions.java    |  2 +-
 .../dataflow/DataflowPipelineTranslatorTest.java        |  2 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java       |  6 +++---
 .../dataflow/options/DataflowPipelineOptionsTest.java   |  4 ++--
 .../beam/runners/dataflow/util/PackageUtilTest.java     |  2 +-
 .../runners/spark/translation/SparkRuntimeContext.java  |  2 +-
 .../main/java/org/apache/beam/sdk/PipelineRunner.java   |  2 +-
 .../main/java/org/apache/beam/sdk/io/FileSystems.java   | 12 +++++++++++-
 .../java/org/apache/beam/sdk/testing/TestPipeline.java  |  2 +-
 .../extensions/gcp/storage/GcsFileSystemRegistrar.java  |  2 +-
 .../sdk/extensions/gcp/storage/GcsResourceIdTest.java   |  2 +-
 .../apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java   |  2 +-
 .../apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java   |  2 +-
 14 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 02afa7a..46b04fc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable {
         .as(ApexPipelineOptions.class);
 
     if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
-      FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+      FileSystems.setDefaultPipelineOptions(pipelineOptions);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 84f3bf4..40b6dd6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
       try {
         pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
 
-        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 93c1e5b..87744f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Pipeline p = Pipeline.create(options);
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
      .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ce01aa1..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -177,7 +177,7 @@ public class DataflowRunnerTest {
         .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     return p;
   }
@@ -246,7 +246,7 @@ public class DataflowRunnerTest {
     options.setGcpCredential(new TestCredential());
 
     // Configure the FileSystem registrar to use these options.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     return options;
   }
@@ -771,7 +771,7 @@ public class DataflowRunnerTest {
   @Test
   public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 613604a..cea44f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location/");
     assertEquals("gs://temp_location/", options.getGcpTempLocation());
@@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location/");
     options.setGcpTempLocation("gs://gcp_temp_location/");

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index c7a660e..5d0c0f2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -120,7 +120,7 @@ public class PackageUtilTest {
 
     GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
     pipelineOptions.setGcsUtil(mockGcsUtil);
-    FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+    FileSystems.setDefaultPipelineOptions(pipelineOptions);
     createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 82e8b44..f3fe99c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable {
           }
         }
         // Register standard FileSystems.
-        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
       }
       return pipelineOptions;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index 87705af..c114501 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
     PipelineOptionsValidator.validate(PipelineOptions.class, options);
 
     // (Re-)register standard FileSystems. Clobbers any prior credentials.
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index cfb63c0..1aacc90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -51,6 +51,7 @@ import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -448,12 +449,21 @@ public class FileSystems {
 
   /********************************** METHODS FOR REGISTRATION **********************************/
 
+  /** @deprecated to be removed. */
+  @Deprecated // for DataflowRunner backwards compatibility.
+  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+    setDefaultPipelineOptions(options);
+  }
+
   /**
    * Sets the default configuration in workers.
    *
    * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+   *
+   * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
    */
-  public static void setDefaultConfigInWorkers(PipelineOptions options) {
+  @Internal
+  public static void setDefaultPipelineOptions(PipelineOptions options) {
     checkNotNull(options, "options");
     Set<FileSystemRegistrar> registrars =
         Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index e04c2f8..9206e04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule {
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
-      FileSystems.setDefaultConfigInWorkers(options);
+      FileSystems.setDefaultPipelineOptions(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index f954b33..f3a67b2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -40,7 +40,7 @@ public class GcsFileSystemRegistrar implements FileSystemRegistrar {
   public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(
         options,
-        "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+        "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
     return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
index 2a67501..c1e214e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -168,7 +168,7 @@ public class GcsResourceIdTest {
 
   @Test
   public void testResourceIdTester() throws Exception {
-    FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions());
+    FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions());
     ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 14591d8..88275f4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -219,7 +219,7 @@ public class HadoopFileSystemTest {
     HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
         .as(HadoopFileSystemOptions.class);
     options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
     PCollection<String> pc = p.apply(
         TextIO.read().from(testPath("testFile*").toString()));
     PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");

http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
index f179132..c4a8577 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -53,7 +53,7 @@ public class HadoopResourceIdTest {
     // Register HadoopFileSystem for this test.
     HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
     options.setHdfsConfiguration(Collections.singletonList(configuration));
-    FileSystems.setDefaultConfigInWorkers(options);
+    FileSystems.setDefaultPipelineOptions(options);
   }
 
   @After