You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:34:46 UTC
[03/50] [abbrv] beam git commit: Rename
FileSystems.setDefaultConfigInWorkers
Rename FileSystems.setDefaultConfigInWorkers
And document that it's not for users.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3540d47
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3540d47
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3540d47
Branch: refs/heads/jstorm-runner
Commit: f3540d47f10c18859340a738a7e93643ee57f604
Parents: 15df211
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 11:46:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 14:59:10 2017 -0700
----------------------------------------------------------------------
.../translation/utils/SerializablePipelineOptions.java | 2 +-
.../translation/utils/SerializedPipelineOptions.java | 2 +-
.../dataflow/DataflowPipelineTranslatorTest.java | 2 +-
.../beam/runners/dataflow/DataflowRunnerTest.java | 6 +++---
.../dataflow/options/DataflowPipelineOptionsTest.java | 4 ++--
.../beam/runners/dataflow/util/PackageUtilTest.java | 2 +-
.../runners/spark/translation/SparkRuntimeContext.java | 2 +-
.../main/java/org/apache/beam/sdk/PipelineRunner.java | 2 +-
.../main/java/org/apache/beam/sdk/io/FileSystems.java | 12 +++++++++++-
.../java/org/apache/beam/sdk/testing/TestPipeline.java | 2 +-
.../extensions/gcp/storage/GcsFileSystemRegistrar.java | 2 +-
.../sdk/extensions/gcp/storage/GcsResourceIdTest.java | 2 +-
.../apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java | 2 +-
.../apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java | 2 +-
14 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 02afa7a..46b04fc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable {
.as(ApexPipelineOptions.class);
if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 84f3bf4..40b6dd6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
try {
pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 93c1e5b..87744f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline p = Pipeline.create(options);
// Enable the FileSystems API to know about gs:// URIs in this test.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ce01aa1..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -177,7 +177,7 @@ public class DataflowRunnerTest {
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return p;
}
@@ -246,7 +246,7 @@ public class DataflowRunnerTest {
options.setGcpCredential(new TestCredential());
// Configure the FileSystem registrar to use these options.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return options;
}
@@ -771,7 +771,7 @@ public class DataflowRunnerTest {
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 613604a..cea44f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
assertEquals("gs://temp_location/", options.getGcpTempLocation());
@@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
options.setGcpTempLocation("gs://gcp_temp_location/");
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index c7a660e..5d0c0f2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -120,7 +120,7 @@ public class PackageUtilTest {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcsUtil(mockGcsUtil);
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 82e8b44..f3fe99c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable {
}
}
// Register standard FileSystems.
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
}
return pipelineOptions;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index 87705af..c114501 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
PipelineOptionsValidator.validate(PipelineOptions.class, options);
// (Re-)register standard FileSystems. Clobbers any prior credentials.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
@SuppressWarnings("unchecked")
PipelineRunner<? extends PipelineResult> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index cfb63c0..1aacc90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -51,6 +51,7 @@ import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -448,12 +449,21 @@ public class FileSystems {
/********************************** METHODS FOR REGISTRATION **********************************/
+ /** @deprecated to be removed. */
+ @Deprecated // for DataflowRunner backwards compatibility.
+ public static void setDefaultConfigInWorkers(PipelineOptions options) {
+ setDefaultPipelineOptions(options);
+ }
+
/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+ *
+ * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
*/
- public static void setDefaultConfigInWorkers(PipelineOptions options) {
+ @Internal
+ public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index e04c2f8..9206e04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule {
}
options.setStableUniqueNames(CheckEnabled.ERROR);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index f954b33..f3a67b2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -40,7 +40,7 @@ public class GcsFileSystemRegistrar implements FileSystemRegistrar {
public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(
options,
- "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+ "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
index 2a67501..c1e214e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -168,7 +168,7 @@ public class GcsResourceIdTest {
@Test
public void testResourceIdTester() throws Exception {
- FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions());
+ FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions());
ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 14591d8..88275f4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -219,7 +219,7 @@ public class HadoopFileSystemTest {
HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
.as(HadoopFileSystemOptions.class);
options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
PCollection<String> pc = p.apply(
TextIO.read().from(testPath("testFile*").toString()));
PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
index f179132..c4a8577 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -53,7 +53,7 @@ public class HadoopResourceIdTest {
// Register HadoopFileSystem for this test.
HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
options.setHdfsConfiguration(Collections.singletonList(configuration));
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
}
@After