You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 16:32:53 UTC

[3/9] beam git commit: DataflowPipelineOptions: use FileSystems, not IOChannelUtils, to resolve staging location

DataflowPipelineOptions: use FileSystems, not IOChannelUtils, to resolve staging location


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc9e0048
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc9e0048
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc9e0048

Branch: refs/heads/master
Commit: dc9e00485afaf737557bc6a82750e45ecba34926
Parents: e5a38ed
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 18:01:29 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 09:32:45 2017 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineOptions.java         | 14 +++++---------
 .../dataflow/DataflowPipelineTranslatorTest.java |  2 +-
 .../runners/dataflow/DataflowRunnerTest.java     |  4 +++-
 .../options/DataflowPipelineOptionsTest.java     | 19 +++++++++----------
 4 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 0796b6d..11618af 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import java.io.IOException;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * Options that can be used to configure the {@link DataflowRunner}.
@@ -137,13 +137,9 @@ public interface DataflowPipelineOptions
             "Error constructing default value for stagingLocation: gcpTempLocation is not"
             + " a valid GCS path, %s. ", gcpTempLocation), e);
       }
-      try {
-        return IOChannelUtils.resolve(gcpTempLocation, "staging");
-      } catch (IOException e) {
-        throw new IllegalArgumentException(String.format(
-            "Unable to resolve stagingLocation from gcpTempLocation: %s."
-            + " Please set the staging location explicitly.", gcpTempLocation), e);
-      }
+      return FileSystems.matchNewResource(gcpTempLocation, true /* isDirectory */)
+          .resolve("staging", StandardResolveOptions.RESOLVE_DIRECTORY)
+          .toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index a6ad8c5..70f00fb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -224,7 +224,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name"));
     assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path"));
     assertThat(optionsMap,
-        hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging"));
+        hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging/"));
     assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING"));
     assertThat(optionsMap, hasEntry("streaming", (Object) false));
     assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0));

http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index c0dfbee..5aebf29 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -157,8 +157,9 @@ public class DataflowRunnerTest {
       }
     });
     when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true);
     when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true);
-    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))).
+    when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/"))).
         thenReturn(true);
     when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
     when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
@@ -826,6 +827,7 @@ public class DataflowRunnerTest {
   @Test
   public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    FileSystems.setDefaultConfigInWorkers(options);
     options.setRunner(DataflowRunner.class);
     options.setProject("foo-12345");
 

http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 30eee0e..8b8fd6c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.junit.Rule;
 import org.junit.Test;
@@ -127,7 +127,6 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testStagingLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerIOFactoriesAllowOverride(options);
     options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     options.setStagingLocation("gs://staging_location");
@@ -138,21 +137,21 @@ public class DataflowPipelineOptionsTest {
   @Test
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerIOFactoriesAllowOverride(options);
+    FileSystems.setDefaultConfigInWorkers(options);
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setTempLocation("gs://temp_location");
-    assertEquals("gs://temp_location", options.getGcpTempLocation());
-    assertEquals("gs://temp_location/staging", options.getStagingLocation());
+    options.setTempLocation("gs://temp_location/");
+    assertEquals("gs://temp_location/", options.getGcpTempLocation());
+    assertEquals("gs://temp_location/staging/", options.getStagingLocation());
   }
 
   @Test
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    IOChannelUtils.registerIOFactoriesAllowOverride(options);
+    FileSystems.setDefaultConfigInWorkers(options);
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setTempLocation("gs://temp_location");
-    options.setGcpTempLocation("gs://gcp_temp_location");
-    assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());
+    options.setTempLocation("gs://temp_location/");
+    options.setGcpTempLocation("gs://gcp_temp_location/");
+    assertEquals("gs://gcp_temp_location/staging/", options.getStagingLocation());
   }
 
   @Test