You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 16:32:53 UTC
[3/9] beam git commit: DataflowPipelineOptions: use FileSystems,
not IOChannelUtils, to resolve staging location
DataflowPipelineOptions: use FileSystems, not IOChannelUtils, to resolve staging location
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc9e0048
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc9e0048
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc9e0048
Branch: refs/heads/master
Commit: dc9e00485afaf737557bc6a82750e45ecba34926
Parents: e5a38ed
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 18:01:29 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:32:45 2017 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineOptions.java | 14 +++++---------
.../dataflow/DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 4 +++-
.../options/DataflowPipelineOptionsTest.java | 19 +++++++++----------
4 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 0796b6d..11618af 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.dataflow.options;
-import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.util.IOChannelUtils;
/**
* Options that can be used to configure the {@link DataflowRunner}.
@@ -137,13 +137,9 @@ public interface DataflowPipelineOptions
"Error constructing default value for stagingLocation: gcpTempLocation is not"
+ " a valid GCS path, %s. ", gcpTempLocation), e);
}
- try {
- return IOChannelUtils.resolve(gcpTempLocation, "staging");
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format(
- "Unable to resolve stagingLocation from gcpTempLocation: %s."
- + " Please set the staging location explicitly.", gcpTempLocation), e);
- }
+ return FileSystems.matchNewResource(gcpTempLocation, true /* isDirectory */)
+ .resolve("staging", StandardResolveOptions.RESOLVE_DIRECTORY)
+ .toString();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index a6ad8c5..70f00fb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -224,7 +224,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name"));
assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path"));
assertThat(optionsMap,
- hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging"));
+ hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging/"));
assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING"));
assertThat(optionsMap, hasEntry("streaming", (Object) false));
assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));
http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index c0dfbee..5aebf29 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -157,8 +157,9 @@ public class DataflowRunnerTest {
}
});
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
- when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))).
+ when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/"))).
thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
@@ -826,6 +827,7 @@ public class DataflowRunnerTest {
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ FileSystems.setDefaultConfigInWorkers(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 30eee0e..8b8fd6c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.junit.Rule;
import org.junit.Test;
@@ -127,7 +127,6 @@ public class DataflowPipelineOptionsTest {
@Test
public void testStagingLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerIOFactoriesAllowOverride(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location");
options.setStagingLocation("gs://staging_location");
@@ -138,21 +137,21 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerIOFactoriesAllowOverride(options);
+ FileSystems.setDefaultConfigInWorkers(options);
options.setPathValidatorClass(NoopPathValidator.class);
- options.setTempLocation("gs://temp_location");
- assertEquals("gs://temp_location", options.getGcpTempLocation());
- assertEquals("gs://temp_location/staging", options.getStagingLocation());
+ options.setTempLocation("gs://temp_location/");
+ assertEquals("gs://temp_location/", options.getGcpTempLocation());
+ assertEquals("gs://temp_location/staging/", options.getStagingLocation());
}
@Test
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- IOChannelUtils.registerIOFactoriesAllowOverride(options);
+ FileSystems.setDefaultConfigInWorkers(options);
options.setPathValidatorClass(NoopPathValidator.class);
- options.setTempLocation("gs://temp_location");
- options.setGcpTempLocation("gs://gcp_temp_location");
- assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());
+ options.setTempLocation("gs://temp_location/");
+ options.setGcpTempLocation("gs://gcp_temp_location/");
+ assertEquals("gs://gcp_temp_location/staging/", options.getStagingLocation());
}
@Test